Skip to content

Commit 701ad50

Browse files
committed
upload multipart in parallel
1 parent 8699ce8 commit 701ad50

File tree

2 files changed

+12
-10
lines changed

2 files changed

+12
-10
lines changed

src/storage/object_storage.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,11 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
844844
let stream_relative_path = format!("{stream_name}/{file_suffix}");
845845

846846
// Try uploading the file, handle potential errors without breaking the loop
847-
if let Err(e) = self.upload_file(&stream_relative_path, &path).await {
847+
// if let Err(e) = self.upload_multipart(key, path)
848+
if let Err(e) = self
849+
.upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path)
850+
.await
851+
{
848852
error!("Failed to upload file {filename:?}: {e}");
849853
continue; // Skip to the next file
850854
}

src/storage/s3.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -521,14 +521,6 @@ impl S3 {
521521

522522
let mut async_writer = self.client.put_multipart(location).await?;
523523

524-
// /* `abort_multipart()` has been removed */
525-
// let close_multipart = |err| async move {
526-
// error!("multipart upload failed. {:?}", err);
527-
// self.client
528-
// .abort_multipart(&key.into(), &multipart_id)
529-
// .await
530-
// };
531-
532524
let meta = file.metadata().await?;
533525
let total_size = meta.len() as usize;
534526
if total_size < MIN_MULTIPART_UPLOAD_SIZE {
@@ -567,7 +559,13 @@ impl S3 {
567559

568560
// upload_parts.push(part_number as u64 + 1);
569561
}
570-
async_writer.complete().await?;
562+
match async_writer.complete().await {
563+
Ok(_) => {},
564+
Err(err) => {
565+
error!("Failed to complete multipart upload. {:?}", err);
566+
async_writer.abort().await?;
567+
}
568+
};
571569
}
572570
Ok(())
573571
}

0 commit comments

Comments
 (0)