Skip to content

Commit 490c678

Browse files
author
Devdutt Shenoi
committed
refactor: DRY get_all_correlations
1 parent 8e9e789 commit 490c678

File tree

4 files changed

+36
-115
lines changed

4 files changed

+36
-115
lines changed

src/storage/azure_blob.rs

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
*/
1818
use super::object_storage::parseable_json_path;
1919
use super::{
20-
to_object_store_path, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY
20+
to_object_store_path, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider,
21+
PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
2122
};
2223
use async_trait::async_trait;
2324
use bytes::Bytes;
@@ -41,7 +42,7 @@ use crate::metrics::storage::azureblob::REQUEST_RESPONSE_TIME;
4142
use crate::metrics::storage::StorageMetrics;
4243
use object_store::limit::LimitStore;
4344
use object_store::path::Path as StorePath;
44-
use std::collections::{BTreeMap, HashMap, HashSet};
45+
use std::collections::{BTreeMap, HashSet};
4546
use std::sync::Arc;
4647
use std::time::{Duration, Instant};
4748

@@ -668,43 +669,6 @@ impl ObjectStorage for BlobStore {
668669
.collect::<Vec<_>>())
669670
}
670671

671-
///fetch all correlations uploaded in object store
672-
/// return the correlation file path and all correlation json bytes for each file path
673-
async fn get_all_correlations(
674-
&self,
675-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
676-
let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
677-
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
678-
let resp = self
679-
.client
680-
.list_with_delimiter(Some(&users_root_path))
681-
.await?;
682-
683-
let users = resp
684-
.common_prefixes
685-
.iter()
686-
.flat_map(|path| path.parts())
687-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
688-
.map(|name| name.as_ref().to_string())
689-
.collect::<Vec<_>>();
690-
for user in users {
691-
let user_correlation_path =
692-
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations"));
693-
let correlations_path = RelativePathBuf::from(&user_correlation_path);
694-
let correlation_bytes = self
695-
.get_objects(
696-
Some(&correlations_path),
697-
Box::new(|file_name| file_name.ends_with(".json")),
698-
)
699-
.await?;
700-
701-
correlations
702-
.entry(correlations_path)
703-
.or_default()
704-
.extend(correlation_bytes);
705-
}
706-
Ok(correlations)
707-
}
708672
fn get_bucket_name(&self) -> String {
709673
self.container.clone()
710674
}

src/storage/localfs.rs

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use std::{
20-
collections::{BTreeMap, HashMap, HashSet},
20+
collections::{BTreeMap, HashSet},
2121
path::{Path, PathBuf},
2222
sync::Arc,
2323
time::Instant,
@@ -27,7 +27,7 @@ use async_trait::async_trait;
2727
use bytes::Bytes;
2828
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
2929
use fs_extra::file::CopyOptions;
30-
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
30+
use futures::{stream::FuturesUnordered, TryStreamExt};
3131
use relative_path::{RelativePath, RelativePathBuf};
3232
use tokio::fs::{self, DirEntry};
3333
use tokio_stream::wrappers::ReadDirStream;
@@ -374,37 +374,6 @@ impl ObjectStorage for LocalFS {
374374
Ok(dirs)
375375
}
376376

377-
///fetch all correlations stored in disk
378-
/// return the correlation file path and all correlation json bytes for each file path
379-
async fn get_all_correlations(
380-
&self,
381-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
382-
let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
383-
let users_root_path = self.root.join(USERS_ROOT_DIR);
384-
let mut directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
385-
while let Some(user) = directories.next().await {
386-
let user = user?;
387-
if !user.path().is_dir() {
388-
continue;
389-
}
390-
let correlations_path = users_root_path.join(user.path()).join("correlations");
391-
let mut files = ReadDirStream::new(fs::read_dir(&correlations_path).await?);
392-
while let Some(correlation) = files.next().await {
393-
let correlation_absolute_path = correlation?.path();
394-
let file = fs::read(correlation_absolute_path.clone()).await?;
395-
let correlation_relative_path = correlation_absolute_path
396-
.strip_prefix(self.root.as_path())
397-
.unwrap();
398-
399-
correlations
400-
.entry(RelativePathBuf::from_path(correlation_relative_path).unwrap())
401-
.or_default()
402-
.push(file.into());
403-
}
404-
}
405-
Ok(correlations)
406-
}
407-
408377
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
409378
let path = self.root.join(stream_name);
410379
let directories = ReadDirStream::new(fs::read_dir(&path).await?);

src/storage/object_storage.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ use super::{
2121
ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata,
2222
};
2323
use super::{
24-
LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY
24+
LogStream, Owner, StreamType, ALERTS_ROOT_DIRECTORY, MANIFEST_FILE,
25+
PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME,
26+
STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
2527
};
2628

2729
use crate::alerts::AlertConfig;
@@ -128,7 +130,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
128130
&self,
129131
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
130132
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
131-
133+
132134
let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]);
133135
for user in self.list_dirs_relative(&users_dir).await? {
134136
let user_dashboard_path =
@@ -149,9 +151,33 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
149151
Ok(dashboards)
150152
}
151153

154+
///fetch all correlations stored in object store
155+
/// return the correlation file path and all correlation json bytes for each file path
152156
async fn get_all_correlations(
153157
&self,
154-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
158+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
159+
let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
160+
161+
let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]);
162+
for user in self.list_dirs_relative(&users_dir).await? {
163+
let user_correlation_path =
164+
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",));
165+
let correlations_path = RelativePathBuf::from(&user_correlation_path);
166+
let correlation_bytes = self
167+
.get_objects(
168+
Some(&correlations_path),
169+
Box::new(|file_name| file_name.ends_with(".json")),
170+
)
171+
.await?;
172+
173+
correlations
174+
.entry(correlations_path)
175+
.or_default()
176+
.extend(correlation_bytes);
177+
}
178+
Ok(correlations)
179+
}
180+
155181
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
156182
async fn list_manifest_files(
157183
&self,

src/storage/s3.rs

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ use std::time::{Duration, Instant};
4343
use super::metrics_layer::MetricLayer;
4444
use super::object_storage::parseable_json_path;
4545
use super::{
46-
to_object_store_path, LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY
46+
to_object_store_path, LogStream, ObjectStorageProvider, SCHEMA_FILE_NAME,
47+
STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
4748
};
4849
use crate::handlers::http::users::USERS_ROOT_DIR;
4950
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
5051
use crate::storage::{ObjectStorage, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY};
51-
use std::collections::HashMap;
5252

5353
// in bytes
5454
// const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100;
@@ -801,44 +801,6 @@ impl ObjectStorage for S3 {
801801
.collect::<Vec<_>>())
802802
}
803803

804-
///fetch all correlations stored in object store
805-
/// return the correlation file path and all correlation json bytes for each file path
806-
async fn get_all_correlations(
807-
&self,
808-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
809-
let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
810-
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
811-
let resp = self
812-
.client
813-
.list_with_delimiter(Some(&users_root_path))
814-
.await?;
815-
816-
let users = resp
817-
.common_prefixes
818-
.iter()
819-
.flat_map(|path| path.parts())
820-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
821-
.map(|name| name.as_ref().to_string())
822-
.collect::<Vec<_>>();
823-
for user in users {
824-
let user_correlation_path =
825-
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",));
826-
let correlations_path = RelativePathBuf::from(&user_correlation_path);
827-
let correlation_bytes = self
828-
.get_objects(
829-
Some(&correlations_path),
830-
Box::new(|file_name| file_name.ends_with(".json")),
831-
)
832-
.await?;
833-
834-
correlations
835-
.entry(correlations_path)
836-
.or_default()
837-
.extend(correlation_bytes);
838-
}
839-
Ok(correlations)
840-
}
841-
842804
fn get_bucket_name(&self) -> String {
843805
self.bucket.clone()
844806
}

0 commit comments

Comments
 (0)