From cc74f21dbe6a105fcf584052b2bf02bdc6a8c22f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 1 Feb 2025 13:37:08 +0530 Subject: [PATCH 01/11] refactor: DRY `to_object_store_path` --- src/storage/azure_blob.rs | 7 +------ src/storage/mod.rs | 6 ++++++ src/storage/s3.rs | 7 +------ 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index b70f66bf3..66d8712b0 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -17,8 +17,7 @@ */ use super::object_storage::parseable_json_path; use super::{ - LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + to_object_store_path, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY }; use async_trait::async_trait; use bytes::Bytes; @@ -185,10 +184,6 @@ impl ObjectStorageProvider for AzureBlobConfig { } } -pub fn to_object_store_path(path: &RelativePath) -> StorePath { - StorePath::from(path.as_str()) -} - // ObjStoreClient is generic client to enable interactions with different cloudprovider's // object store such as S3 and Azure Blob #[derive(Debug)] diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b33cc7b7b..b2bc71249 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -25,6 +25,8 @@ use crate::{ }; use chrono::Local; +use object_store::path::Path; +use relative_path::RelativePath; use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -260,3 +262,7 @@ pub enum ObjectStorageError { #[error("Error: {0}")] MetadataError(#[from] MetadataError), } + +pub fn to_object_store_path(path: &RelativePath) -> Path { + Path::from(path.as_str()) +} diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 96bfb0d30..7578bc2d7 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -43,8 +43,7 @@ use std::time::{Duration, Instant}; use super::metrics_layer::MetricLayer; use super::object_storage::parseable_json_path; use super::{ - LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, - STREAM_ROOT_DIRECTORY, + to_object_store_path, LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY }; use crate::handlers::http::users::USERS_ROOT_DIR; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; @@ -321,10 +320,6 @@ impl ObjectStorageProvider for S3Config { } } -fn to_object_store_path(path: &RelativePath) -> StorePath { - StorePath::from(path.as_str()) -} - #[derive(Debug)] pub struct S3 { client: LimitStore, From 3e5e8e4e99e77a587831e075a7af7a37018e3ba4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 1 Feb 2025 15:15:02 +0530 Subject: [PATCH 02/11] list_dirs_relative --- src/storage/azure_blob.rs | 12 ++++++++++++ src/storage/localfs.rs | 19 +++++++++++++++++++ src/storage/object_storage.rs | 1 + src/storage/s3.rs | 12 ++++++++++++ 4 files changed, 44 insertions(+) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 66d8712b0..6ee02b6c3 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -653,6 +653,18 @@ impl ObjectStorage for BlobStore { .collect::>()) } + async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result, ObjectStorageError> { + let prefix = object_store::path::Path::from(relative_path.as_str()); + let resp = self.client.list_with_delimiter(Some(&prefix)).await?; + + Ok(resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .map(|name| name.as_ref().to_string()) + .collect::>()) + } + async fn get_all_dashboards( &self, ) -> Result>, ObjectStorageError> { diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index f9ac41a05..5cf6612b0 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -353,6 +353,25 @@ impl ObjectStorage for LocalFS { Ok(dirs) } + + async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result, ObjectStorageError> { + let root = self.root.join(relative_path.as_str()); + let dirs = ReadDirStream::new(fs::read_dir(root).await?) + .try_collect::>() + .await? + .into_iter() + .map(dir_name); + + let dirs = FuturesUnordered::from_iter(dirs) + .try_collect::>() + .await? + .into_iter() + .flatten() + .collect::>(); + + Ok(dirs) + } + async fn get_all_dashboards( &self, ) -> Result>, ObjectStorageError> { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 4dc8603df..ac9a19a9f 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -94,6 +94,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { async fn list_streams(&self) -> Result, ObjectStorageError>; async fn list_old_streams(&self) -> Result, ObjectStorageError>; async fn list_dirs(&self) -> Result, ObjectStorageError>; + async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result, ObjectStorageError>; async fn get_all_saved_filters( &self, ) -> Result>, ObjectStorageError>; diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 7578bc2d7..d9cfc3a9f 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -786,6 +786,18 @@ impl ObjectStorage for S3 { .collect::>()) } + async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result, ObjectStorageError> { + let prefix = object_store::path::Path::from(relative_path.as_str()); + let resp = self.client.list_with_delimiter(Some(&prefix)).await?; + + Ok(resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .map(|name| name.as_ref().to_string()) + .collect::>()) + } + async fn get_all_dashboards( &self, ) -> Result>, ObjectStorageError> { From d08cd828074b1b41107c1b8c7feab86d4185246a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 1 Feb 2025 15:19:02 +0530 Subject: [PATCH 03/11] refactor: DRY `get_all_saved_filters` --- src/storage/azure_blob.rs | 52 +++-------------------------------- src/storage/localfs.rs | 49 ++++----------------------------- src/storage/object_storage.rs | 30 ++++++++++++++++++-- src/storage/s3.rs | 52 +++-------------------------------- 4 files changed, 41 insertions(+), 142 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 6ee02b6c3..ea6bc9995 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -653,7 +653,10 @@ impl ObjectStorage for BlobStore { .collect::>()) } - async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result, ObjectStorageError> { + async fn list_dirs_relative( + &self, + relative_path: &RelativePath, + ) -> Result, ObjectStorageError> { let prefix = object_store::path::Path::from(relative_path.as_str()); let resp = self.client.list_with_delimiter(Some(&prefix)).await?; @@ -701,53 +704,6 @@ impl ObjectStorage for BlobStore { Ok(dashboards) } - async fn get_all_saved_filters( - &self, - ) -> Result>, ObjectStorageError> { - let mut filters: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_filters_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); - let resp = self - .client - .list_with_delimiter(Some(&user_filters_path)) - .await?; - let streams = resp - .common_prefixes - .iter() - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for stream in streams { - let filters_path = RelativePathBuf::from(&stream); - let filter_bytes = self - .get_objects( - Some(&filters_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - filters - .entry(filters_path) - .or_default() - .extend(filter_bytes); - } - } - Ok(filters) - } - ///fetch all correlations uploaded in object store /// return the correlation file path and all correlation json bytes for each file path async fn get_all_correlations( diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 5cf6612b0..5ea8e676d 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -353,8 +353,10 @@ impl ObjectStorage for LocalFS { Ok(dirs) } - - async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result, ObjectStorageError> { + async fn list_dirs_relative( + &self, + relative_path: &RelativePath, + ) -> Result, ObjectStorageError> { let root = self.root.join(relative_path.as_str()); let dirs = ReadDirStream::new(fs::read_dir(root).await?) .try_collect::>() @@ -368,7 +370,7 @@ impl ObjectStorage for LocalFS { .into_iter() .flatten() .collect::>(); - + Ok(dirs) } @@ -402,47 +404,6 @@ impl ObjectStorage for LocalFS { Ok(dashboards) } - async fn get_all_saved_filters( - &self, - ) -> Result>, ObjectStorageError> { - let mut filters: HashMap> = HashMap::new(); - let users_root_path = self.root.join(USERS_ROOT_DIR); - let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); - let users: Vec = directories.try_collect().await?; - for user in users { - if !user.path().is_dir() { - continue; - } - let stream_root_path = users_root_path.join(user.path()).join("filters"); - let directories = ReadDirStream::new(fs::read_dir(&stream_root_path).await?); - let streams: Vec = directories.try_collect().await?; - for stream in streams { - if !stream.path().is_dir() { - continue; - } - let filters_path = users_root_path - .join(user.path()) - .join("filters") - .join(stream.path()); - let directories = ReadDirStream::new(fs::read_dir(&filters_path).await?); - let filters_files: Vec = directories.try_collect().await?; - for filter in filters_files { - let filter_absolute_path = filter.path(); - let file = fs::read(filter_absolute_path.clone()).await?; - let filter_relative_path = filter_absolute_path - .strip_prefix(self.root.as_path()) - .unwrap(); - - filters - .entry(RelativePathBuf::from_path(filter_relative_path).unwrap()) - .or_default() - .push(file.into()); - } - } - } - Ok(filters) - } - ///fetch all correlations stored in disk /// return the correlation file path and all correlation json bytes for each file path async fn get_all_correlations( diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index ac9a19a9f..26e5be310 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -94,10 +94,36 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { async fn list_streams(&self) -> Result, ObjectStorageError>; async fn list_old_streams(&self) -> Result, ObjectStorageError>; async fn list_dirs(&self) -> Result, ObjectStorageError>; - async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result, ObjectStorageError>; + async fn list_dirs_relative( + &self, + relative_path: &RelativePath, + ) -> Result, ObjectStorageError>; + async fn get_all_saved_filters( &self, - ) -> Result>, ObjectStorageError>; + ) -> Result>, ObjectStorageError> { + let mut filters: HashMap> = HashMap::new(); + + let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); + for user in self.list_dirs_relative(&users_dir).await? { + let stream_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user]); + for stream in self.list_dirs_relative(&stream_dir).await? { + let filters_path = RelativePathBuf::from(&stream); + let filter_bytes = self + .get_objects( + Some(&filters_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + filters + .entry(filters_path) + .or_default() + .extend(filter_bytes); + } + } + Ok(filters) + } + async fn get_all_dashboards( &self, ) -> Result>, ObjectStorageError>; diff --git a/src/storage/s3.rs b/src/storage/s3.rs index d9cfc3a9f..8dfc29bc4 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -786,7 +786,10 @@ impl ObjectStorage for S3 { .collect::>()) } - async fn list_dirs_relative(&self, relative_path: &RelativePath) -> Result, ObjectStorageError> { + async fn list_dirs_relative( + &self, + relative_path: &RelativePath, + ) -> Result, ObjectStorageError> { let prefix = object_store::path::Path::from(relative_path.as_str()); let resp = self.client.list_with_delimiter(Some(&prefix)).await?; @@ -834,53 +837,6 @@ impl ObjectStorage for S3 { Ok(dashboards) } - async fn get_all_saved_filters( - &self, - ) -> Result>, ObjectStorageError> { - let mut filters: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_filters_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/filters",)); - let resp = self - .client - .list_with_delimiter(Some(&user_filters_path)) - .await?; - let streams = resp - .common_prefixes - .iter() - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for stream in streams { - let filters_path = RelativePathBuf::from(&stream); - let filter_bytes = self - .get_objects( - Some(&filters_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - filters - .entry(filters_path) - .or_default() - .extend(filter_bytes); - } - } - Ok(filters) - } - ///fetch all correlations stored in object store /// return the correlation file path and all correlation json bytes for each file path async fn get_all_correlations( From 8e9e78953e4c39140bf8df5a3a42b859101c734b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 1 Feb 2025 15:21:40 +0530 Subject: [PATCH 04/11] refactor: DRY `get_all_dashboards` --- src/storage/azure_blob.rs | 36 ----------------------------------- src/storage/localfs.rs | 30 ----------------------------- src/storage/object_storage.rs | 24 ++++++++++++++++++++++- src/storage/s3.rs | 36 ----------------------------------- 4 files changed, 23 insertions(+), 103 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index ea6bc9995..378adfd30 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -668,42 +668,6 @@ impl ObjectStorage for BlobStore { .collect::>()) } - async fn get_all_dashboards( - &self, - ) -> Result>, ObjectStorageError> { - let mut dashboards: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_dashboard_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); - let dashboards_path = RelativePathBuf::from(&user_dashboard_path); - let dashboard_bytes = self - .get_objects( - Some(&dashboards_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - - dashboards - .entry(dashboards_path) - .or_default() - .extend(dashboard_bytes); - } - Ok(dashboards) - } - ///fetch all correlations uploaded in object store /// return the correlation file path and all correlation json bytes for each file path async fn get_all_correlations( diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 5ea8e676d..4d928e5d1 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -374,36 +374,6 @@ impl ObjectStorage for LocalFS { Ok(dirs) } - async fn get_all_dashboards( - &self, - ) -> Result>, ObjectStorageError> { - let mut dashboards: HashMap> = HashMap::new(); - let users_root_path = self.root.join(USERS_ROOT_DIR); - let directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); - let users: Vec = directories.try_collect().await?; - for user in users { - if !user.path().is_dir() { - continue; - } - let dashboards_path = users_root_path.join(user.path()).join("dashboards"); - let directories = ReadDirStream::new(fs::read_dir(&dashboards_path).await?); - let dashboards_files: Vec = directories.try_collect().await?; - for dashboard in dashboards_files { - let dashboard_absolute_path = dashboard.path(); - let file = fs::read(dashboard_absolute_path.clone()).await?; - let dashboard_relative_path = dashboard_absolute_path - .strip_prefix(self.root.as_path()) - .unwrap(); - - dashboards - .entry(RelativePathBuf::from_path(dashboard_relative_path).unwrap()) - .or_default() - .push(file.into()); - } - } - Ok(dashboards) - } - ///fetch all correlations stored in disk /// return the correlation file path and all correlation json bytes for each file path async fn get_all_correlations( diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 26e5be310..390c6e3e1 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -126,7 +126,29 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { async fn get_all_dashboards( &self, - ) -> Result>, ObjectStorageError>; + ) -> Result>, ObjectStorageError> { + let mut dashboards: HashMap> = HashMap::new(); + + let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); + for user in self.list_dirs_relative(&users_dir).await? { + let user_dashboard_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); + let dashboards_path = RelativePathBuf::from(&user_dashboard_path); + let dashboard_bytes = self + .get_objects( + Some(&dashboards_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + dashboards + .entry(dashboards_path) + .or_default() + .extend(dashboard_bytes); + } + Ok(dashboards) + } + async fn get_all_correlations( &self, ) -> Result>, ObjectStorageError>; diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 8dfc29bc4..e110bede9 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -801,42 +801,6 @@ impl ObjectStorage for S3 { .collect::>()) } - async fn get_all_dashboards( - &self, - ) -> Result>, ObjectStorageError> { - let mut dashboards: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_dashboard_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); - let dashboards_path = RelativePathBuf::from(&user_dashboard_path); - let dashboard_bytes = self - .get_objects( - Some(&dashboards_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - - dashboards - .entry(dashboards_path) - .or_default() - .extend(dashboard_bytes); - } - Ok(dashboards) - } - ///fetch all correlations stored in object store /// return the correlation file path and all correlation json bytes for each file path async fn get_all_correlations( From 490c678fd070c72c0fd8990eddf41f78baebc583 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 1 Feb 2025 15:24:48 +0530 Subject: [PATCH 05/11] refactor: DRY `get_all_correlations` --- src/storage/azure_blob.rs | 42 +++-------------------------------- src/storage/localfs.rs | 35 ++--------------------------- src/storage/object_storage.rs | 32 +++++++++++++++++++++++--- src/storage/s3.rs | 42 ++--------------------------------- 4 files changed, 36 insertions(+), 115 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 378adfd30..28ad5cbfa 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -17,7 +17,8 @@ */ use super::object_storage::parseable_json_path; use super::{ - to_object_store_path, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY + to_object_store_path, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, + PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use async_trait::async_trait; use bytes::Bytes; @@ -41,7 +42,7 @@ use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME; use crate::metrics::storage::StorageMetrics; use object_store::limit::LimitStore; use object_store::path::Path as StorePath; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -668,43 +669,6 @@ impl ObjectStorage for BlobStore { .collect::>()) } - ///fetch all correlations uploaded in object store - /// return the correlation file path and all correlation json bytes for each file path - async fn get_all_correlations( - &self, - ) -> Result>, ObjectStorageError> { - let mut correlations: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_correlation_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations")); - let correlations_path = RelativePathBuf::from(&user_correlation_path); - let correlation_bytes = self - .get_objects( - Some(&correlations_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - - correlations - .entry(correlations_path) - .or_default() - .extend(correlation_bytes); - } - Ok(correlations) - } fn get_bucket_name(&self) -> String { self.container.clone() } diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 4d928e5d1..3dcb34344 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -17,7 +17,7 @@ */ use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{BTreeMap, HashSet}, path::{Path, PathBuf}, sync::Arc, time::Instant, @@ -27,7 +27,7 @@ use async_trait::async_trait; use bytes::Bytes; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use fs_extra::file::CopyOptions; -use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use futures::{stream::FuturesUnordered, TryStreamExt}; use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; @@ -374,37 +374,6 @@ impl ObjectStorage for LocalFS { Ok(dirs) } - ///fetch all correlations stored in disk - /// return the correlation file path and all correlation json bytes for each file path - async fn get_all_correlations( - &self, - ) -> Result>, ObjectStorageError> { - let mut correlations: HashMap> = HashMap::new(); - let users_root_path = self.root.join(USERS_ROOT_DIR); - let mut directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?); - while let Some(user) = directories.next().await { - let user = user?; - if !user.path().is_dir() { - continue; - } - let correlations_path = users_root_path.join(user.path()).join("correlations"); - let mut files = ReadDirStream::new(fs::read_dir(&correlations_path).await?); - while let Some(correlation) = files.next().await { - let correlation_absolute_path = correlation?.path(); - let file = fs::read(correlation_absolute_path.clone()).await?; - let correlation_relative_path = correlation_absolute_path - .strip_prefix(self.root.as_path()) - .unwrap(); - - correlations - .entry(RelativePathBuf::from_path(correlation_relative_path).unwrap()) - .or_default() - .push(file.into()); - } - } - Ok(correlations) - } - async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { let path = self.root.join(stream_name); let directories = ReadDirStream::new(fs::read_dir(&path).await?); diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 390c6e3e1..28f2ddf1d 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,7 +21,9 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY + LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, + PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::alerts::AlertConfig; @@ -128,7 +130,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { &self, ) -> Result>, ObjectStorageError> { let mut dashboards: HashMap> = HashMap::new(); - + let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); for user in self.list_dirs_relative(&users_dir).await? { let user_dashboard_path = @@ -149,9 +151,33 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(dashboards) } + ///fetch all correlations stored in object store + /// return the correlation file path and all correlation json bytes for each file path async fn get_all_correlations( &self, - ) -> Result>, ObjectStorageError>; + ) -> Result>, ObjectStorageError> { + let mut correlations: HashMap> = HashMap::new(); + + let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); + for user in self.list_dirs_relative(&users_dir).await? { + let user_correlation_path = + object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",)); + let correlations_path = RelativePathBuf::from(&user_correlation_path); + let correlation_bytes = self + .get_objects( + Some(&correlations_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + correlations + .entry(correlations_path) + .or_default() + .extend(correlation_bytes); + } + Ok(correlations) + } + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn list_manifest_files( &self, diff --git a/src/storage/s3.rs b/src/storage/s3.rs index e110bede9..eea3a00ae 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -43,12 +43,12 @@ use std::time::{Duration, Instant}; use super::metrics_layer::MetricLayer; use super::object_storage::parseable_json_path; use super::{ - to_object_store_path, LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY + to_object_store_path, LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; use crate::handlers::http::users::USERS_ROOT_DIR; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; -use std::collections::HashMap; // in bytes // const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; @@ -801,44 +801,6 @@ impl ObjectStorage for S3 { .collect::>()) } - ///fetch all correlations stored in object store - /// return the correlation file path and all correlation json bytes for each file path - async fn get_all_correlations( - &self, - ) -> Result>, ObjectStorageError> { - let mut correlations: HashMap> = HashMap::new(); - let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR); - let resp = self - .client - .list_with_delimiter(Some(&users_root_path)) - .await?; - - let users = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != USERS_ROOT_DIR) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for user in users { - let user_correlation_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",)); - let correlations_path = RelativePathBuf::from(&user_correlation_path); - let correlation_bytes = self - .get_objects( - Some(&correlations_path), - Box::new(|file_name| file_name.ends_with(".json")), - ) - .await?; - - correlations - .entry(correlations_path) - .or_default() - .extend(correlation_bytes); - } - Ok(correlations) - } - fn get_bucket_name(&self) -> String { self.bucket.clone() } From 295a823c364ec79abcaa36269a816fef2d79d614 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 31 Jan 2025 23:06:34 +0530 Subject: [PATCH 06/11] refactor: organize imports --- src/storage/azure_blob.rs | 64 +++++++++++++++++++---------------- src/storage/localfs.rs | 5 ++- src/storage/mod.rs | 36 ++++++++++---------- src/storage/object_storage.rs | 62 +++++++++++++++------------------ src/storage/s3.rs | 61 +++++++++++++++++---------------- 5 files changed, 114 insertions(+), 114 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 28ad5cbfa..2b1b32817 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -15,39 +15,45 @@ * along with this program. If not, see . * */ -use super::object_storage::parseable_json_path; -use super::{ - to_object_store_path, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, - PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + +use std::{ + collections::{BTreeMap, HashSet}, + path::Path, + sync::Arc, + time::{Duration, Instant}, }; -use async_trait::async_trait; + use bytes::Bytes; -use datafusion::datasource::listing::ListingTableUrl; -use datafusion::datasource::object_store::{ - DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl, +use datafusion::{ + datasource::listing::ListingTableUrl, + execution::{ + object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl}, + runtime_env::RuntimeEnvBuilder, + }, +}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use object_store::{ + azure::{MicrosoftAzure, MicrosoftAzureBuilder}, + limit::LimitStore, + path::Path as StorePath, + BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig, }; -use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use futures::stream::FuturesUnordered; -use futures::{StreamExt, TryStreamExt}; -use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder}; -use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig}; use relative_path::{RelativePath, RelativePathBuf}; -use std::path::Path as StdPath; +use tonic::async_trait; use tracing::{error, info}; use url::Url; -use super::metrics_layer::MetricLayer; -use crate::handlers::http::users::USERS_ROOT_DIR; -use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME; -use crate::metrics::storage::StorageMetrics; -use object_store::limit::LimitStore; -use object_store::path::Path as StorePath; -use std::collections::{BTreeMap, HashSet}; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use crate::{ + handlers::http::users::USERS_ROOT_DIR, + metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, +}; -const CONNECT_TIMEOUT_SECS: u64 = 5; -const REQUEST_TIMEOUT_SECS: u64 = 300; +use super::{ + metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, + PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, + STREAM_ROOT_DIRECTORY, +}; #[derive(Debug, Clone, clap::Args)] #[command( @@ -156,7 +162,7 @@ impl ObjectStorageProvider for AzureBlobConfig { let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); let azure = MetricLayer::new(azure); - let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new(); + let object_store_registry = DefaultObjectStoreRegistry::new(); let url = ObjectStoreUrl::parse(format!("https://{}.blob.core.windows.net", self.account)) .unwrap(); object_store_registry.register_store(url.as_ref(), Arc::new(azure)); @@ -338,7 +344,7 @@ impl BlobStore { } Ok(result_file_list) } - async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { let instant = Instant::now(); // // TODO: Uncomment this when multipart is fixed @@ -367,7 +373,7 @@ impl BlobStore { } // TODO: introduce parallel, multipart-uploads if required - // async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + // async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; // let mut file = OpenOptions::new().read(true).open(path).await?; @@ -614,7 +620,7 @@ impl ObjectStorage for BlobStore { Ok(files) } - async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { self._upload_file(key, path).await?; Ok(()) diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 3dcb34344..0223a60c8 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -32,10 +32,9 @@ use relative_path::{RelativePath, RelativePathBuf}; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; -use crate::option::validation; use crate::{ - handlers::http::users::USERS_ROOT_DIR, - metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}, + handlers::http::users::USERS_ROOT_DIR, metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, + option::validation, }; use super::{ diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b2bc71249..1c96a363b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -16,6 +16,20 @@ * */ +pub use azure_blob::AzureBlobConfig; +use chrono::Local; +pub use localfs::FSConfig; +pub use object_storage::{ObjectStorage, ObjectStorageProvider}; +use object_store::path::Path; +use relative_path::RelativePath; +use retention::Retention; +pub use s3::S3Config; +use serde::{Deserialize, Serialize}; +pub use staging::StorageDir; +pub use store_metadata::{ + put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, +}; + use crate::{ catalog::snapshot::Snapshot, event::format::LogSource, @@ -24,32 +38,15 @@ use crate::{ utils::json::{deserialize_string_as_true, serialize_bool_as_true}, }; -use chrono::Local; -use object_store::path::Path; -use relative_path::RelativePath; -use serde::{Deserialize, Serialize}; - -use std::fmt::Debug; - mod azure_blob; mod localfs; mod metrics_layer; -pub(crate) mod object_storage; +pub mod object_storage; pub mod retention; mod s3; pub mod staging; mod store_metadata; -use self::retention::Retention; -pub use self::staging::StorageDir; -pub use azure_blob::AzureBlobConfig; -pub use localfs::FSConfig; -pub use object_storage::{ObjectStorage, ObjectStorageProvider}; -pub use s3::S3Config; -pub use store_metadata::{ - put_remote_metadata, put_staging_metadata, resolve_parseable_metadata, StorageMetadata, -}; - /// Name of a Stream /// NOTE: this used to be a struct, flattened out for simplicity pub type LogStream = String; @@ -84,6 +81,9 @@ const ACCESS_ALL: &str = "all"; pub const CURRENT_OBJECT_STORE_VERSION: &str = "v5"; pub const CURRENT_SCHEMA_VERSION: &str = "v5"; +const CONNECT_TIMEOUT_SECS: u64 = 5; +const REQUEST_TIMEOUT_SECS: u64 = 300; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ObjectStoreFormat { /// Version of schema registry diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 28f2ddf1d..5a9fccae9 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -16,30 +16,13 @@ * */ -use super::{ - retention::Retention, staging::convert_disk_files_to_parquet, ObjectStorageError, - ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, -}; -use super::{ - LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, - PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, -}; - -use crate::alerts::AlertConfig; -use crate::event::format::LogSource; -use crate::handlers::http::modal::ingest_server::INGESTOR_META; -use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; -use crate::metadata::SchemaVersion; -use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; -use crate::option::Mode; -use crate::{ - catalog::{self, manifest::Manifest, snapshot::Snapshot}, - metadata::STREAM_INFO, - metrics::{storage::StorageMetrics, STORAGE_SIZE}, - option::CONFIG, - stats::FullStats, -}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::fmt::Debug; +use std::fs::remove_file; +use std::num::NonZeroU32; +use std::path::Path; +use std::sync::Arc; +use std::time::{Duration, Instant}; use actix_web_prometheus::PrometheusMetrics; use arrow_schema::Schema; @@ -53,15 +36,26 @@ use relative_path::RelativePathBuf; use tracing::{error, warn}; use ulid::Ulid; -use std::collections::{BTreeMap, HashSet}; -use std::fmt::Debug; -use std::num::NonZeroU32; -use std::{ - collections::HashMap, - fs, - path::Path, - sync::Arc, - time::{Duration, Instant}, +use crate::alerts::AlertConfig; +use crate::catalog; +use crate::catalog::manifest::Manifest; +use crate::catalog::snapshot::Snapshot; +use crate::event::format::LogSource; +use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; +use crate::metadata::{SchemaVersion, STREAM_INFO}; +use crate::metrics::storage::StorageMetrics; +use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; +use crate::option::{Mode, CONFIG}; +use crate::stats::FullStats; + +use super::retention::Retention; +use super::staging::{convert_disk_files_to_parquet, StorageDir}; +use super::{ + LogStream, ObjectStorageError, ObjectStoreFormat, Owner, Permisssion, + StorageMetadata, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, + PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, + STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { @@ -729,7 +723,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); catalog::update_snapshot(store, stream, manifest).await?; - let _ = fs::remove_file(file); + let _ = remove_file(file); } } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index eea3a00ae..a7a0f2c33 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -16,44 +16,45 @@ * */ +use std::{ + collections::{BTreeMap, HashSet}, + fmt::Display, + path::Path, + str::FromStr, + sync::Arc, + time::{Duration, Instant}, +}; + use async_trait::async_trait; use bytes::Bytes; -use datafusion::datasource::listing::ListingTableUrl; -use datafusion::datasource::object_store::{ - DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl, +use datafusion::{ + datasource::listing::ListingTableUrl, + execution::{ + object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry, ObjectStoreUrl}, + runtime_env::RuntimeEnvBuilder, + }, +}; +use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; +use object_store::{ + aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}, + limit::LimitStore, + path::Path as StorePath, + BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig, }; -use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use futures::stream::FuturesUnordered; -use futures::{StreamExt, TryStreamExt}; -use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}; -use object_store::limit::LimitStore; -use object_store::path::Path as StorePath; -use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig}; use relative_path::{RelativePath, RelativePathBuf}; use tracing::{error, info}; -use std::collections::{BTreeMap, HashSet}; -use std::fmt::Display; -use std::iter::Iterator; -use std::path::Path as StdPath; -use std::str::FromStr; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use crate::{ + handlers::http::users::USERS_ROOT_DIR, + metrics::storage::{azureblob::REQUEST_RESPONSE_TIME, StorageMetrics}, +}; -use super::metrics_layer::MetricLayer; -use super::object_storage::parseable_json_path; use super::{ - to_object_store_path, LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME, - STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, CONNECT_TIMEOUT_SECS, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY }; -use crate::handlers::http::users::USERS_ROOT_DIR; -use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; -use crate::storage::{ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY}; // in bytes // const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; -const CONNECT_TIMEOUT_SECS: u64 = 5; -const REQUEST_TIMEOUT_SECS: u64 = 300; const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"; #[derive(Debug, Clone, clap::Args)] @@ -291,7 +292,7 @@ impl ObjectStorageProvider for S3Config { let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); let s3 = MetricLayer::new(s3); - let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new(); + let object_store_registry = DefaultObjectStoreRegistry::new(); let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap(); object_store_registry.register_store(url.as_ref(), Arc::new(s3)); @@ -474,7 +475,7 @@ impl S3 { } Ok(result_file_list) } - async fn _upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + async fn _upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { let instant = Instant::now(); // // TODO: Uncomment this when multipart is fixed @@ -503,7 +504,7 @@ impl S3 { } // TODO: introduce parallel, multipart-uploads if required - // async fn _upload_multipart(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + // async fn _upload_multipart(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { // let mut buf = vec![0u8; MULTIPART_UPLOAD_SIZE / 2]; // let mut file = OpenOptions::new().read(true).open(path).await?; @@ -750,7 +751,7 @@ impl ObjectStorage for S3 { Ok(files) } - async fn upload_file(&self, key: &str, path: &StdPath) -> Result<(), ObjectStorageError> { + async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError> { self._upload_file(key, path).await?; Ok(()) From 59020fb99e00d5e895a62f47cc0ed8e0153f1f20 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 1 Feb 2025 20:48:36 +0530 Subject: [PATCH 07/11] revert `async-trait` --- src/storage/azure_blob.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 2b1b32817..593308c63 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -23,6 +23,7 @@ use std::{ time::{Duration, Instant}, }; +use async_trait::async_trait; use bytes::Bytes; use datafusion::{ datasource::listing::ListingTableUrl, @@ -39,7 +40,6 @@ use object_store::{ BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig, }; use relative_path::{RelativePath, RelativePathBuf}; -use tonic::async_trait; use tracing::{error, info}; use url::Url; From 3227d982a4856a2c382314abd2347e57215682f7 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 18 Feb 2025 15:47:07 +0530 Subject: [PATCH 08/11] fix+refactor: user file location --- src/storage/object_storage.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 5f2e913b6..11144c2df 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -103,9 +103,10 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); for user in self.list_dirs_relative(&users_dir).await? { - let stream_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user]); + let stream_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user, "filters"]); for stream in self.list_dirs_relative(&stream_dir).await? { - let filters_path = RelativePathBuf::from(&stream); + let filters_path = + RelativePathBuf::from_iter([USERS_ROOT_DIR, &user, "filters", &stream]); let filter_bytes = self .get_objects( Some(&filters_path), @@ -118,6 +119,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .extend(filter_bytes); } } + Ok(filters) } @@ -128,9 +130,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); for user in self.list_dirs_relative(&users_dir).await? { - let user_dashboard_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/dashboards")); - let dashboards_path = RelativePathBuf::from(&user_dashboard_path); + let dashboards_path = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user, "dashboards"]); let dashboard_bytes = self .get_objects( Some(&dashboards_path), @@ -143,6 +143,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .or_default() .extend(dashboard_bytes); } + Ok(dashboards) } @@ -155,9 +156,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); for user in self.list_dirs_relative(&users_dir).await? { - let user_correlation_path = - object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",)); - let correlations_path = RelativePathBuf::from(&user_correlation_path); + let correlations_path = + RelativePathBuf::from_iter([USERS_ROOT_DIR, &user, "correlations"]); let correlation_bytes = self .get_objects( Some(&correlations_path), @@ -170,6 +170,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .or_default() .extend(correlation_bytes); } + Ok(correlations) } From fc18333ff705615fe35807cf527ca82d791a1f3f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 19 Feb 2025 11:14:27 +0530 Subject: [PATCH 09/11] log: errors when removing file --- src/storage/object_storage.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 11144c2df..d9f35a14d 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -724,8 +724,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let stream = PARSEABLE.get_or_create_stream(&stream_name); let custom_partition = stream.get_custom_partition(); - for file in stream.parquet_files() { - let filename = file + for path in stream.parquet_files() { + let filename = path .file_name() .expect("only parquet files are returned by iterator") .to_str() @@ -733,7 +733,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let mut file_date_part = filename.split('.').collect::>()[0]; file_date_part = file_date_part.split('=').collect::>()[1]; - let compressed_size = file.metadata().map_or(0, |meta| meta.len()); + let compressed_size = path.metadata().map_or(0, |meta| meta.len()); STORAGE_SIZE .with_label_values(&["data", &stream_name, "parquet"]) .add(compressed_size as i64); @@ -757,8 +757,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let stream_relative_path = format!("{stream_name}/{file_suffix}"); // Try uploading the file, handle potential errors without breaking the loop - if let Err(e) = self.upload_file(&stream_relative_path, &file).await { - error!("Failed to upload file {}: {:?}", filename, e); + if let Err(e) = self.upload_file(&stream_relative_path, &path).await { + error!("Failed to upload file {filename:?}: {e}"); continue; // Skip to the next file } @@ -767,17 +767,21 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .to_string(); let store = PARSEABLE.storage().get_object_store(); let manifest = - catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap(); + catalog::create_from_parquet_file(absolute_path.clone(), &path).unwrap(); catalog::update_snapshot(store, &stream_name, manifest).await?; - let _ = remove_file(file); + if let Err(e) = remove_file(path) { + warn!("Failed to remove staged file: {e}"); + } } for path in stream.schema_files() { let file = File::open(&path)?; let schema: Schema = serde_json::from_reader(file)?; commit_schema_to_storage(&stream_name, schema).await?; - let _ = remove_file(path); + if let Err(e) = remove_file(path) { + warn!("Failed to remove staged file: {e}"); + } } } From c78c7cc226d2adeca58f355c42bc6315c8ae2ac7 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 19 Feb 2025 11:59:44 +0530 Subject: [PATCH 10/11] refactor: use `RelativePathBuf::join` --- src/storage/object_storage.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index d9f35a14d..1326be16a 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -103,10 +103,9 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); for user in self.list_dirs_relative(&users_dir).await? { - let stream_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user, "filters"]); + let stream_dir = users_dir.join(&user).join("filters"); for stream in self.list_dirs_relative(&stream_dir).await? { - let filters_path = - RelativePathBuf::from_iter([USERS_ROOT_DIR, &user, "filters", &stream]); + let filters_path = stream_dir.join(&stream); let filter_bytes = self .get_objects( Some(&filters_path), @@ -130,7 +129,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); for user in self.list_dirs_relative(&users_dir).await? { - let dashboards_path = RelativePathBuf::from_iter([USERS_ROOT_DIR, &user, "dashboards"]); + let dashboards_path = users_dir.join(&user).join("dashboards"); let dashboard_bytes = self .get_objects( Some(&dashboards_path), @@ -156,8 +155,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); for user in self.list_dirs_relative(&users_dir).await? { - let correlations_path = - RelativePathBuf::from_iter([USERS_ROOT_DIR, &user, "correlations"]); + let correlations_path = users_dir.join(&user).join("correlations"); let correlation_bytes = self .get_objects( Some(&correlations_path), From 0bdd17817bc26e457b94a7f1d693bdee34f41467 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 19 Feb 2025 12:11:25 +0530 Subject: [PATCH 11/11] simplify `RelativePathBuf` construction --- src/storage/object_storage.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 1326be16a..a6ba5558e 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -101,7 +101,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { ) -> Result>, ObjectStorageError> { let mut filters: HashMap> = HashMap::new(); - let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); + let users_dir = RelativePathBuf::from(USERS_ROOT_DIR); for user in self.list_dirs_relative(&users_dir).await? { let stream_dir = users_dir.join(&user).join("filters"); for stream in self.list_dirs_relative(&stream_dir).await? { @@ -127,7 +127,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { ) -> Result>, ObjectStorageError> { let mut dashboards: HashMap> = HashMap::new(); - let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); + let users_dir = RelativePathBuf::from(USERS_ROOT_DIR); for user in self.list_dirs_relative(&users_dir).await? { let dashboards_path = users_dir.join(&user).join("dashboards"); let dashboard_bytes = self @@ -153,7 +153,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { ) -> Result>, ObjectStorageError> { let mut correlations: HashMap> = HashMap::new(); - let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]); + let users_dir = RelativePathBuf::from(USERS_ROOT_DIR); for user in self.list_dirs_relative(&users_dir).await? { let correlations_path = users_dir.join(&user).join("correlations"); let correlation_bytes = self @@ -376,7 +376,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn get_alerts(&self) -> Result, ObjectStorageError> { - let alerts_path = RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY]); + let alerts_path = RelativePathBuf::from(ALERTS_ROOT_DIRECTORY); let alerts = self .get_objects( Some(&alerts_path), @@ -700,7 +700,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn get_correlations(&self) -> Result, CorrelationError> { - let correlation_path = RelativePathBuf::from_iter([CORRELATION_DIR]); + let correlation_path = RelativePathBuf::from(CORRELATION_DIR); let correlation_bytes = self .get_objects( Some(&correlation_path),