Skip to content

Commit 295a823

Browse files
author
Devdutt Shenoi
committed
refactor: organize imports
1 parent 490c678 commit 295a823

File tree

5 files changed

+114
-114
lines changed

5 files changed

+114
-114
lines changed

src/storage/azure_blob.rs

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,45 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18-
use super::object_storage::parseable_json_path;
19-
use super::{
20-
to_object_store_path, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider,
21-
PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
18+
19+
use std::{
20+
collections::{BTreeMap, HashSet},
21+
path::Path,
22+
sync::Arc,
23+
time::{Duration, Instant},
2224
};
23-
use async_trait::async_trait;
25+
2426
use bytes::Bytes;
25-
use datafusion::datasource::listing::ListingTableUrl;
26-
use datafusion::datasource::object_store::{
27-
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
27+
use datafusion::{
28+
datasource::listing::ListingTableUrl,
29+
execution::{
30+
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl},
31+
runtime_env::RuntimeEnvBuilder,
32+
},
33+
};
34+
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
35+
use object_store::{
36+
azure::{MicrosoftAzure, MicrosoftAzureBuilder},
37+
limit::LimitStore,
38+
path::Path as StorePath,
39+
BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig,
2840
};
29-
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
30-
use futures::stream::FuturesUnordered;
31-
use futures::{StreamExt, TryStreamExt};
32-
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
33-
use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig};
3441
use relative_path::{RelativePath, RelativePathBuf};
35-
use std::path::Path as StdPath;
42+
use tonic::async_trait;
3643
use tracing::{error, info};
3744
use url::Url;
3845

39-
use super::metrics_layer::MetricLayer;
40-
use crate::handlers::http::users::USERS_ROOT_DIR;
41-
use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME;
42-
use crate::metrics::storage::StorageMetrics;
43-
use object_store::limit::LimitStore;
44-
use object_store::path::Path as StorePath;
45-
use std::collections::{BTreeMap, HashSet};
46-
use std::sync::Arc;
47-
use std::time::{Duration, Instant};
46+
use crate::{
47+
handlers::http::users::USERS_ROOT_DIR,
48+
metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics},
49+
};
4850

49-
const CONNECT_TIMEOUT_SECS: u64 = 5;
50-
const REQUEST_TIMEOUT_SECS: u64 = 300;
51+
use super::{
52+
metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path,
53+
LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS,
54+
PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME,
55+
STREAM_ROOT_DIRECTORY,
56+
};
5157

5258
#[derive(Debug, Clone, clap::Args)]
5359
#[command(
@@ -156,7 +162,7 @@ impl ObjectStorageProvider for AzureBlobConfig {
156162
let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS);
157163
let azure = MetricLayer::new(azure);
158164

159-
let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new();
165+
let object_store_registry = DefaultObjectStoreRegistry::new();
160166
let url = ObjectStoreUrl::parse(format!("https://{}.blob.core.windows.net", self.account))
161167
.unwrap();
162168
object_store_registry.register_store(url.as_ref(), Arc::new(azure));
@@ -338,7 +344,7 @@ impl BlobStore {
338344
}
339345
Ok(result_file_list)
340346
}
341-
async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> {
347+
async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
342348
let instant = Instant::now();
343349

344350
// // TODO: Uncomment this when multipart is fixed
@@ -367,7 +373,7 @@ impl BlobStore {
367373
}
368374

369375
// TODO: introduce parallel, multipart-uploads if required
370-
// async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> {
376+
// async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
371377
// let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2];
372378
// let mut file = OpenOptions::new().read(true).open(path).await?;
373379

@@ -614,7 +620,7 @@ impl ObjectStorage for BlobStore {
614620
Ok(files)
615621
}
616622

617-
async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> {
623+
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
618624
self._upload_file(key, path).await?;
619625

620626
Ok(())

src/storage/localfs.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@ use relative_path::{RelativePath, RelativePathBuf};
3232
use tokio::fs::{self, DirEntry};
3333
use tokio_stream::wrappers::ReadDirStream;
3434

35-
use crate::option::validation;
3635
use crate::{
37-
handlers::http::users::USERS_ROOT_DIR,
38-
metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics},
36+
handlers::http::users::USERS_ROOT_DIR, metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics},
37+
option::validation,
3938
};
4039

4140
use super::{

src/storage/mod.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,20 @@
1616
*
1717
*/
1818

19+
pub use azure_blob::AzureBlobConfig;
20+
use chrono::Local;
21+
pub use localfs::FSConfig;
22+
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
23+
use object_store::path::Path;
24+
use relative_path::RelativePath;
25+
use retention::Retention;
26+
pub use s3::S3Config;
27+
use serde::{Deserialize, Serialize};
28+
pub use staging::StorageDir;
29+
pub use store_metadata::{
30+
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
31+
};
32+
1933
use crate::{
2034
catalog::snapshot::Snapshot,
2135
event::format::LogSource,
@@ -24,32 +38,15 @@ use crate::{
2438
utils::json::{deserialize_string_as_true, serialize_bool_as_true},
2539
};
2640

27-
use chrono::Local;
28-
use object_store::path::Path;
29-
use relative_path::RelativePath;
30-
use serde::{Deserialize, Serialize};
31-
32-
use std::fmt::Debug;
33-
3441
mod azure_blob;
3542
mod localfs;
3643
mod metrics_layer;
37-
pub(crate) mod object_storage;
44+
pub mod object_storage;
3845
pub mod retention;
3946
mod s3;
4047
pub mod staging;
4148
mod store_metadata;
4249

43-
use self::retention::Retention;
44-
pub use self::staging::StorageDir;
45-
pub use azure_blob::AzureBlobConfig;
46-
pub use localfs::FSConfig;
47-
pub use object_storage::{ObjectStorage, ObjectStorageProvider};
48-
pub use s3::S3Config;
49-
pub use store_metadata::{
50-
put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata,
51-
};
52-
5350
/// Name of a Stream
5451
/// NOTE: this used to be a struct, flattened out for simplicity
5552
pub type LogStream = String;
@@ -84,6 +81,9 @@ const ACCESS_ALL: &str = "all";
8481
pub const CURRENT_OBJECT_STORE_VERSION: &str = "v5";
8582
pub const CURRENT_SCHEMA_VERSION: &str = "v5";
8683

84+
const CONNECT_TIMEOUT_SECS: u64 = 5;
85+
const REQUEST_TIMEOUT_SECS: u64 = 300;
86+
8787
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
8888
pub struct ObjectStoreFormat {
8989
/// Version of schema registry

src/storage/object_storage.rs

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,13 @@
1616
*
1717
*/
1818

19-
use super::{
20-
retention::Retention, staging::convert_disk_files_to_parquet, ObjectStorageError,
21-
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
22-
};
23-
use super::{
24-
LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE,
25-
PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME,
26-
STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
27-
};
28-
29-
use crate::alerts::AlertConfig;
30-
use crate::event::format::LogSource;
31-
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
32-
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
33-
use crate::metadata::SchemaVersion;
34-
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
35-
use crate::option::Mode;
36-
use crate::{
37-
catalog::{self, manifest::Manifest, snapshot::Snapshot},
38-
metadata::STREAM_INFO,
39-
metrics::{storage::StorageMetrics, STORAGE_SIZE},
40-
option::CONFIG,
41-
stats::FullStats,
42-
};
19+
use std::collections::{BTreeMap, HashMap, HashSet};
20+
use std::fmt::Debug;
21+
use std::fs::remove_file;
22+
use std::num::NonZeroU32;
23+
use std::path::Path;
24+
use std::sync::Arc;
25+
use std::time::{Duration, Instant};
4326

4427
use actix_web_prometheus::PrometheusMetrics;
4528
use arrow_schema::Schema;
@@ -53,15 +36,26 @@ use relative_path::RelativePathBuf;
5336
use tracing::{error, warn};
5437
use ulid::Ulid;
5538

56-
use std::collections::{BTreeMap, HashSet};
57-
use std::fmt::Debug;
58-
use std::num::NonZeroU32;
59-
use std::{
60-
collections::HashMap,
61-
fs,
62-
path::Path,
63-
sync::Arc,
64-
time::{Duration, Instant},
39+
use crate::alerts::AlertConfig;
40+
use crate::catalog;
41+
use crate::catalog::manifest::Manifest;
42+
use crate::catalog::snapshot::Snapshot;
43+
use crate::event::format::LogSource;
44+
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
45+
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
46+
use crate::metadata::{SchemaVersion, STREAM_INFO};
47+
use crate::metrics::storage::StorageMetrics;
48+
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE};
49+
use crate::option::{Mode, CONFIG};
50+
use crate::stats::FullStats;
51+
52+
use super::retention::Retention;
53+
use super::staging::{convert_disk_files_to_parquet, StorageDir};
54+
use super::{
55+
LogStream, ObjectStorageError, ObjectStoreFormat, Owner, Permisssion,
56+
StorageMetadata, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE,
57+
PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME,
58+
STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
6559
};
6660

6761
pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync {
@@ -729,7 +723,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
729723
catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap();
730724
catalog::update_snapshot(store, stream, manifest).await?;
731725

732-
let _ = fs::remove_file(file);
726+
let _ = remove_file(file);
733727
}
734728
}
735729

src/storage/s3.rs

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,44 +16,45 @@
1616
*
1717
*/
1818

19+
use std::{
20+
collections::{BTreeMap, HashSet},
21+
fmt::Display,
22+
path::Path,
23+
str::FromStr,
24+
sync::Arc,
25+
time::{Duration, Instant},
26+
};
27+
1928
use async_trait::async_trait;
2029
use bytes::Bytes;
21-
use datafusion::datasource::listing::ListingTableUrl;
22-
use datafusion::datasource::object_store::{
23-
DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl,
30+
use datafusion::{
31+
datasource::listing::ListingTableUrl,
32+
execution::{
33+
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl},
34+
runtime_env::RuntimeEnvBuilder,
35+
},
36+
};
37+
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
38+
use object_store::{
39+
aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum},
40+
limit::LimitStore,
41+
path::Path as StorePath,
42+
BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig,
2443
};
25-
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
26-
use futures::stream::FuturesUnordered;
27-
use futures::{StreamExt, TryStreamExt};
28-
use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum};
29-
use object_store::limit::LimitStore;
30-
use object_store::path::Path as StorePath;
31-
use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig};
3244
use relative_path::{RelativePath, RelativePathBuf};
3345
use tracing::{error, info};
3446

35-
use std::collections::{BTreeMap, HashSet};
36-
use std::fmt::Display;
37-
use std::iter::Iterator;
38-
use std::path::Path as StdPath;
39-
use std::str::FromStr;
40-
use std::sync::Arc;
41-
use std::time::{Duration, Instant};
47+
use crate::{
48+
handlers::http::users::USERS_ROOT_DIR,
49+
metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics},
50+
};
4251

43-
use super::metrics_layer::MetricLayer;
44-
use super::object_storage::parseable_json_path;
4552
use super::{
46-
to_object_store_path, LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME,
47-
STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
53+
metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY
4854
};
49-
use crate::handlers::http::users::USERS_ROOT_DIR;
50-
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
51-
use crate::storage::{ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
5255

5356
// in bytes
5457
// const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100;
55-
const CONNECT_TIMEOUT_SECS: u64 = 5;
56-
const REQUEST_TIMEOUT_SECS: u64 = 300;
5758
const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI";
5859

5960
#[derive(Debug, Clone, clap::Args)]
@@ -291,7 +292,7 @@ impl ObjectStorageProvider for S3Config {
291292
let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS);
292293
let s3 = MetricLayer::new(s3);
293294

294-
let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new();
295+
let object_store_registry = DefaultObjectStoreRegistry::new();
295296
let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap();
296297
object_store_registry.register_store(url.as_ref(), Arc::new(s3));
297298

@@ -474,7 +475,7 @@ impl S3 {
474475
}
475476
Ok(result_file_list)
476477
}
477-
async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> {
478+
async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
478479
let instant = Instant::now();
479480

480481
// // TODO: Uncomment this when multipart is fixed
@@ -503,7 +504,7 @@ impl S3 {
503504
}
504505

505506
// TODO: introduce parallel, multipart-uploads if required
506-
// async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> {
507+
// async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
507508
// let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2];
508509
// let mut file = OpenOptions::new().read(true).open(path).await?;
509510

@@ -750,7 +751,7 @@ impl ObjectStorage for S3 {
750751
Ok(files)
751752
}
752753

753-
async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> {
754+
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> {
754755
self._upload_file(key, path).await?;
755756

756757
Ok(())

0 commit comments

Comments
 (0)