@@ -521,53 +521,50 @@ impl S3 {
521
521
522
522
let mut async_writer = self . client . put_multipart ( location) . await ?;
523
523
524
- // /* `abort_multipart()` has been removed */
525
- // let close_multipart = |err| async move {
526
- // error!("multipart upload failed. {:?}", err);
527
- // self.client
528
- // .abort_multipart(&key.into(), &multipart_id)
529
- // .await
530
- // };
531
-
532
524
let meta = file. metadata ( ) . await ?;
533
525
let total_size = meta. len ( ) as usize ;
534
526
if total_size < MIN_MULTIPART_UPLOAD_SIZE {
535
527
let mut data = Vec :: new ( ) ;
536
528
file. read_to_end ( & mut data) . await ?;
537
529
self . client . put ( location, data. into ( ) ) . await ?;
538
- // async_writer.put_part(data.into()).await?;
539
- // async_writer.complete().await?;
530
+
540
531
return Ok ( ( ) ) ;
541
532
} else {
542
- let mut data = Vec :: new ( ) ;
543
- file. read_to_end ( & mut data) . await ?;
533
+ let mut buf = [ 0 ; MIN_MULTIPART_UPLOAD_SIZE * 2 ] ;
544
534
545
- // let mut upload_parts = Vec::new();
535
+ let mut position = 0 ;
536
+ let mut tasks = FuturesUnordered :: new ( ) ;
546
537
547
- let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0 ;
548
- let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE ;
549
- let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 } ;
538
+ while position < total_size {
539
+ // Calculate how much to read in this iteration
540
+ let bytes_to_read = std:: cmp:: min ( buf. len ( ) , total_size - position) ;
541
+ let buffer_slice = & mut buf[ ..bytes_to_read] ;
550
542
551
- // Upload each part
552
- for part_number in 0 ..( total_parts) {
553
- let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE ;
554
- let end_pos = if part_number == num_full_parts && has_final_partial_part {
555
- // Last part might be smaller than 5MB (which is allowed)
556
- total_size
557
- } else {
558
- // All other parts must be at least 5MB
559
- start_pos + MIN_MULTIPART_UPLOAD_SIZE
560
- } ;
543
+ // Read exactly that many bytes
544
+ match file. read_exact ( buffer_slice) . await {
545
+ Ok ( _) => { }
546
+ Err ( e) => return Err ( e. into ( ) ) ,
547
+ }
561
548
562
- // Extract this part's data
563
- let part_data = data[ start_pos..end_pos] . to_vec ( ) ;
549
+ position += bytes_to_read;
564
550
565
551
// Upload the part
566
- async_writer. put_part ( part_data. into ( ) ) . await ?;
552
+ tasks. push ( tokio:: spawn (
553
+ async_writer. put_part ( buffer_slice. to_vec ( ) . into ( ) ) ,
554
+ ) ) ;
555
+ }
567
556
568
- // upload_parts.push(part_number as u64 + 1);
557
+ while let Some ( res) = tasks. next ( ) . await {
558
+ res??;
569
559
}
570
- async_writer. complete ( ) . await ?;
560
+
561
+ match async_writer. complete ( ) . await {
562
+ Ok ( _) => { }
563
+ Err ( e) => {
564
+ error ! ( "Failed to complete multipart upload: {:?}" , e) ;
565
+ async_writer. abort ( ) . await ?;
566
+ }
567
+ } ;
571
568
}
572
569
Ok ( ( ) )
573
570
}
0 commit comments