Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: perform streaming upload one chunk at a time #407

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 25 additions & 51 deletions s3/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1540,73 +1540,47 @@ impl Bucket {
let msg = self
.initiate_multipart_upload(s3_path, content_type)
.await?;
let path = msg.key;
let path = &msg.key;
let upload_id = &msg.upload_id;

let mut part_number: u32 = 0;
let mut etags = Vec::new();

// Collect request handles
let mut handles = vec![];
let mut total_size = 0;
loop {
let chunk = if part_number == 0 {
first_chunk.clone()
} else {
crate::utils::read_chunk_async(reader).await?
};
total_size += chunk.len();

let done = chunk.len() < CHUNK_SIZE;

// Start chunk upload
// Read chunks one by one making multi part request for each and gathering responses
let mut chunk = first_chunk;
let mut part_number = 0;
let mut parts = vec![];
while !chunk.is_empty() {
total_size += chunk.len();
part_number += 1;
handles.push(self.make_multipart_request(
&path,
chunk,

// Perform chunk upload and reading next chunk in parallel
let part_fut = self.make_multipart_request(
path,
std::mem::take(&mut chunk),
part_number,
upload_id,
content_type,
));

if done {
break;
}
}
);
let chunk_fut = crate::utils::read_chunk_async(reader);

// Wait for all chunks to finish (or fail)
let responses = futures::future::join_all(handles).await;
let (next_chunk, response) = futures::future::try_join(chunk_fut, part_fut).await?;
chunk = next_chunk;

for response in responses {
let response_data = response?;
if !(200..300).contains(&response_data.status_code()) {
// Analyze the response to fast fail bad upload
if !(200..300).contains(&response.status_code()) {
// if chunk upload failed - abort the upload
match self.abort_upload(&path, upload_id).await {
Ok(_) => {
return Err(error_from_response_data(response_data)?);
}
Err(error) => {
return Err(error);
}
}
self.abort_upload(&path, upload_id).await?;
return Err(error_from_response_data(response)?);
}

let etag = response_data.as_str()?;
etags.push(etag.to_string());
parts.push(Part {
etag: response.to_string()?,
part_number,
});
}

// Finish the upload
let inner_data = etags
.clone()
.into_iter()
.enumerate()
.map(|(i, x)| Part {
etag: x,
part_number: i as u32 + 1,
})
.collect::<Vec<Part>>();
let response_data = self
.complete_multipart_upload(&path, &msg.upload_id, inner_data)
.complete_multipart_upload(path, upload_id, parts)
.await?;

Ok(PutStreamResponse::new(
Expand Down