Skip to content

Commit 1531215

Browse files
committed
added multipart uploads for other object storage sources
1 parent 7c97750 commit 1531215

File tree

4 files changed

+79
-13
lines changed

4 files changed

+79
-13
lines changed

src/storage/azure_blob.rs

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use object_store::{
4141
BackoffConfig, ClientOptions, ObjectMeta, ObjectStore, PutPayload, RetryConfig,
4242
};
4343
use relative_path::{RelativePath, RelativePathBuf};
44+
use tokio::{fs::OpenOptions, io::AsyncReadExt};
4445
use tracing::{error, info};
4546
use url::Url;
4647

@@ -53,8 +54,8 @@ use crate::{
5354
use super::{
5455
metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path,
5556
ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS,
56-
PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME,
57-
STREAM_ROOT_DIRECTORY,
57+
MIN_MULTIPART_UPLOAD_SIZE, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME,
58+
STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
5859
};
5960

6061
#[derive(Debug, Clone, clap::Args)]
@@ -378,6 +379,65 @@ impl BlobStore {
378379
res
379380
}
380381

382+
async fn _upload_multipart(
383+
&self,
384+
key: &RelativePath,
385+
path: &Path,
386+
) -> Result<(), ObjectStorageError> {
387+
let mut file = OpenOptions::new().read(true).open(path).await?;
388+
let location = &to_object_store_path(key);
389+
390+
let mut async_writer = self.client.put_multipart(location).await?;
391+
392+
let meta = file.metadata().await?;
393+
let total_size = meta.len() as usize;
394+
if total_size < MIN_MULTIPART_UPLOAD_SIZE {
395+
let mut data = Vec::new();
396+
file.read_to_end(&mut data).await?;
397+
self.client.put(location, data.into()).await?;
398+
// async_writer.put_part(data.into()).await?;
399+
// async_writer.complete().await?;
400+
return Ok(());
401+
} else {
402+
let mut data = Vec::new();
403+
file.read_to_end(&mut data).await?;
404+
405+
// let mut upload_parts = Vec::new();
406+
407+
let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
408+
let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE;
409+
let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 };
410+
411+
// Upload each part
412+
for part_number in 0..(total_parts) {
413+
let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE;
414+
let end_pos = if part_number == num_full_parts && has_final_partial_part {
415+
// Last part might be smaller than 5MB (which is allowed)
416+
total_size
417+
} else {
418+
// All other parts must be at least 5MB
419+
start_pos + MIN_MULTIPART_UPLOAD_SIZE
420+
};
421+
422+
// Extract this part's data
423+
let part_data = data[start_pos..end_pos].to_vec();
424+
425+
// Upload the part
426+
async_writer.put_part(part_data.into()).await?;
427+
428+
// upload_parts.push(part_number as u64 + 1);
429+
}
430+
match async_writer.complete().await {
431+
Ok(_) => {}
432+
Err(err) => {
433+
error!("Failed to complete multipart upload. {:?}", err);
434+
async_writer.abort().await?;
435+
}
436+
};
437+
}
438+
Ok(())
439+
}
440+
381441
// TODO: introduce parallel, multipart-uploads if required
382442
// async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
383443
// let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2];
@@ -426,10 +486,10 @@ impl BlobStore {
426486
impl ObjectStorage for BlobStore {
427487
async fn upload_multipart(
428488
&self,
429-
_key: &RelativePath,
430-
_path: &Path,
489+
key: &RelativePath,
490+
path: &Path,
431491
) -> Result<(), ObjectStorageError> {
432-
unimplemented!()
492+
self._upload_multipart(key, path).await
433493
}
434494
async fn get_buffered_reader(
435495
&self,

src/storage/localfs.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ use fs_extra::file::CopyOptions;
3030
use futures::{stream::FuturesUnordered, TryStreamExt};
3131
use object_store::{buffered::BufReader, ObjectMeta};
3232
use relative_path::{RelativePath, RelativePathBuf};
33-
use tokio::fs::{self, DirEntry};
33+
use tokio::{
34+
fs::{self, DirEntry, OpenOptions},
35+
io::AsyncReadExt,
36+
};
3437
use tokio_stream::wrappers::ReadDirStream;
3538

3639
use crate::{
@@ -106,10 +109,13 @@ impl LocalFS {
106109
impl ObjectStorage for LocalFS {
107110
async fn upload_multipart(
108111
&self,
109-
_key: &RelativePath,
110-
_path: &Path,
112+
key: &RelativePath,
113+
path: &Path,
111114
) -> Result<(), ObjectStorageError> {
112-
unimplemented!()
115+
let mut file = OpenOptions::new().read(true).open(path).await?;
116+
let mut data = Vec::new();
117+
file.read_to_end(&mut data).await?;
118+
self.put_object(key, data.into()).await
113119
}
114120
async fn get_buffered_reader(
115121
&self,

src/storage/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ pub const CURRENT_SCHEMA_VERSION: &str = "v6";
7777
const CONNECT_TIMEOUT_SECS: u64 = 5;
7878
const REQUEST_TIMEOUT_SECS: u64 = 300;
7979

80+
pub const MIN_MULTIPART_UPLOAD_SIZE: usize = 25 * 1024 * 1024;
8081
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
8182
pub struct ObjectStoreFormat {
8283
/// Version of schema registry

src/storage/s3.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,13 @@ use crate::{
5555
use super::{
5656
metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path,
5757
ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS,
58-
PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME,
59-
STREAM_ROOT_DIRECTORY,
58+
MIN_MULTIPART_UPLOAD_SIZE, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME,
59+
STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
6060
};
6161

6262
// in bytes
6363
// const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100;
6464
const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";
65-
const MIN_MULTIPART_UPLOAD_SIZE: usize = 25 * 1024 * 1024;
6665

6766
#[derive(Debug, Clone, clap::Args)]
6867
#[command(
@@ -560,7 +559,7 @@ impl S3 {
560559
// upload_parts.push(part_number as u64 + 1);
561560
}
562561
match async_writer.complete().await {
563-
Ok(_) => {},
562+
Ok(_) => {}
564563
Err(err) => {
565564
error!("Failed to complete multipart upload. {:?}", err);
566565
async_writer.abort().await?;

0 commit comments

Comments
 (0)