Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified servers/su/cli
Binary file not shown.
45 changes: 12 additions & 33 deletions servers/su/src/domain/clients/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Uploader for UploaderClient {
let mut delay = Duration::from_secs(1);
let max_delay = Duration::from_secs(32);

let bundle_item = match bytes::DataItem::from_bytes(tx_for_cache.clone()) {
let target_item = match bytes::DataItem::from_bytes(tx_for_cache.clone()) {
Ok(item) => item,
Err(e) => {
logger_for_cache.error(
Expand All @@ -147,40 +147,19 @@ impl Uploader for UploaderClient {
}
};

let bundle_bytes = match bundle_item.data_bytes() {
Some(bytes) => bytes,
None => {
logger_for_cache.error(
"Cache upload failed bundle item has no data bytes".to_string()
);
return;
}
};

let bundle = match bytes::DataBundle::from_bytes(&bundle_bytes) {
Ok(bundle) => bundle,
Err(e) => {
logger_for_cache.error(format!("Cache upload failed to parse data bundle: {:?}", e));
return;
}
};

let target_item = match bundle.items
.into_iter()
.find(|item| {
item.tags().iter().any(|tag| {
match target_item.tags()
.iter().any(|tag| {
tag.name == "Type" && (tag.value == "Process")
})
}) {
Some(item) => item,
None => {
logger_for_cache.log(
"Cache upload skipping, not a Process"
.to_string()
);
return;
}
};
true => (),
false => {
logger_for_cache.log(
"Cache upload skipping, not a Process"
.to_string()
);
return;
}
};

let tx_for_cache_parsed = match target_item.as_bytes() {
Ok(bytes) => bytes,
Expand Down
12 changes: 10 additions & 2 deletions servers/su/src/domain/core/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,9 @@ pub async fn write_item(

let aid = assignment.id();
let did = data_item.id();
let assignment_bytes = assignment
.as_bytes()
.map_err(|e| format!("{:?}", e))?;
let build_result = builder.bundle_items(vec![assignment, data_item]).await?;

let process = Process::from_bundle(&build_result.bundle)?;
Expand All @@ -613,7 +616,8 @@ pub async fn write_item(
.commit(&mut *schedule_info, &next_schedule_info, did, aid);
drop(schedule_info);

upload(&deps, build_result.binary.to_vec()).await?;
upload(&deps, input).await?;
upload(&deps, assignment_bytes).await?;

return id_res(&deps, process.process.process_id.clone(), start_top_level);
} else {
Expand Down Expand Up @@ -690,6 +694,9 @@ pub async fn write_item(
None => None,
};

let assignment_bytes = assignment
.as_bytes()
.map_err(|e| format!("{:?}", e))?;
let build_result = builder.bundle_items(vec![assignment, data_item]).await?;
let message = Message::from_bundle(&build_result.bundle)?;

Expand All @@ -708,7 +715,8 @@ pub async fn write_item(
.commit(&mut *schedule_info, &next_schedule_info, dtarget, aid);
drop(schedule_info);

upload(&deps, build_result.binary.to_vec()).await?;
upload(&deps, input).await?;
upload(&deps, assignment_bytes).await?;
return id_res(&deps, message.message_id()?, start_top_level);
} else {
return Err("Type tag not present".to_string());
Expand Down
2 changes: 1 addition & 1 deletion servers/su/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct FromTo {
to: Option<String>,
limit: Option<i32>,
#[serde(rename = "process-id")]
process_id: Option<String>,
process_id: String,
#[serde(rename = "from-nonce")]
from_nonce: Option<String>,
#[serde(rename = "to-nonce")]
Expand Down
Binary file modified servers/su/su
Binary file not shown.