Skip to content

Commit 15755f9

Browse files
author
Devdutt Shenoi
committed
refactor: DRY get_all_correlations
1 parent 41c8971 commit 15755f9

File tree

4 files changed

+30
-112
lines changed

4 files changed

+30
-112
lines changed

src/storage/azure_blob.rs

Lines changed: 1 addition & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use std::{
20-
collections::{BTreeMap, HashMap, HashSet},
20+
collections::{BTreeMap, HashSet},
2121
path::Path,
2222
sync::Arc,
2323
time::Instant,
@@ -651,43 +651,6 @@ impl ObjectStorage for BlobStore {
651651
.collect::<Vec<_>>())
652652
}
653653

654-
///fetch all correlations uploaded in object store
655-
/// return the correlation file path and all correlation json bytes for each file path
656-
async fn get_all_correlations(
657-
&self,
658-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
659-
let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
660-
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
661-
let resp = self
662-
.client
663-
.list_with_delimiter(Some(&users_root_path))
664-
.await?;
665-
666-
let users = resp
667-
.common_prefixes
668-
.iter()
669-
.flat_map(|path| path.parts())
670-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
671-
.map(|name| name.as_ref().to_string())
672-
.collect::<Vec<_>>();
673-
for user in users {
674-
let user_correlation_path =
675-
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations"));
676-
let correlations_path = RelativePathBuf::from(&user_correlation_path);
677-
let correlation_bytes = self
678-
.get_objects(
679-
Some(&correlations_path),
680-
Box::new(|file_name| file_name.ends_with(".json")),
681-
)
682-
.await?;
683-
684-
correlations
685-
.entry(correlations_path)
686-
.or_default()
687-
.extend(correlation_bytes);
688-
}
689-
Ok(correlations)
690-
}
691654
fn get_bucket_name(&self) -> String {
692655
self.container.clone()
693656
}

src/storage/localfs.rs

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use std::{
20-
collections::{BTreeMap, HashMap, HashSet},
20+
collections::{BTreeMap, HashSet},
2121
path::{Path, PathBuf},
2222
sync::Arc,
2323
time::Instant,
@@ -27,7 +27,7 @@ use async_trait::async_trait;
2727
use bytes::Bytes;
2828
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnv};
2929
use fs_extra::file::CopyOptions;
30-
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
30+
use futures::{stream::FuturesUnordered, TryStreamExt};
3131
use relative_path::{RelativePath, RelativePathBuf};
3232
use tokio::fs::{self, DirEntry};
3333
use tokio_stream::wrappers::ReadDirStream;
@@ -375,37 +375,6 @@ impl ObjectStorage for LocalFS {
375375
Ok(dirs)
376376
}
377377

378-
///fetch all correlations stored in disk
379-
/// return the correlation file path and all correlation json bytes for each file path
380-
async fn get_all_correlations(
381-
&self,
382-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
383-
let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
384-
let users_root_path = self.root.join(USERS_ROOT_DIR);
385-
let mut directories = ReadDirStream::new(fs::read_dir(&users_root_path).await?);
386-
while let Some(user) = directories.next().await {
387-
let user = user?;
388-
if !user.path().is_dir() {
389-
continue;
390-
}
391-
let correlations_path = users_root_path.join(user.path()).join("correlations");
392-
let mut files = ReadDirStream::new(fs::read_dir(&correlations_path).await?);
393-
while let Some(correlation) = files.next().await {
394-
let correlation_absolute_path = correlation?.path();
395-
let file = fs::read(correlation_absolute_path.clone()).await?;
396-
let correlation_relative_path = correlation_absolute_path
397-
.strip_prefix(self.root.as_path())
398-
.unwrap();
399-
400-
correlations
401-
.entry(RelativePathBuf::from_path(correlation_relative_path).unwrap())
402-
.or_default()
403-
.push(file.into());
404-
}
405-
}
406-
Ok(correlations)
407-
}
408-
409378
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
410379
let path = self.root.join(stream_name);
411380
let directories = ReadDirStream::new(fs::read_dir(&path).await?);

src/storage/object_storage.rs

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
175175
&self,
176176
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
177177
let mut dashboards: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
178-
178+
179179
let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]);
180180
for user in self.list_dirs_relative(&users_dir).await? {
181181
let user_dashboard_path =
@@ -196,9 +196,33 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
196196
Ok(dashboards)
197197
}
198198

199+
///fetch all correlations stored in object store
200+
/// return the correlation file path and all correlation json bytes for each file path
199201
async fn get_all_correlations(
200202
&self,
201-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError>;
203+
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
204+
let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
205+
206+
let users_dir = RelativePathBuf::from_iter([USERS_ROOT_DIR]);
207+
for user in self.list_dirs_relative(&users_dir).await? {
208+
let user_correlation_path =
209+
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",));
210+
let correlations_path = RelativePathBuf::from(&user_correlation_path);
211+
let correlation_bytes = self
212+
.get_objects(
213+
Some(&correlations_path),
214+
Box::new(|file_name| file_name.ends_with(".json")),
215+
)
216+
.await?;
217+
218+
correlations
219+
.entry(correlations_path)
220+
.or_default()
221+
.extend(correlation_bytes);
222+
}
223+
Ok(correlations)
224+
}
225+
202226
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
203227
async fn list_manifest_files(
204228
&self,

src/storage/s3.rs

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
use std::{
20-
collections::{BTreeMap, HashMap, HashSet},
20+
collections::{BTreeMap, HashSet},
2121
fmt::Display,
2222
path::Path,
2323
str::FromStr,
@@ -783,44 +783,6 @@ impl ObjectStorage for S3 {
783783
.collect::<Vec<_>>())
784784
}
785785

786-
///fetch all correlations stored in object store
787-
/// return the correlation file path and all correlation json bytes for each file path
788-
async fn get_all_correlations(
789-
&self,
790-
) -> Result<HashMap<RelativePathBuf, Vec<Bytes>>, ObjectStorageError> {
791-
let mut correlations: HashMap<RelativePathBuf, Vec<Bytes>> = HashMap::new();
792-
let users_root_path = object_store::path::Path::from(USERS_ROOT_DIR);
793-
let resp = self
794-
.client
795-
.list_with_delimiter(Some(&users_root_path))
796-
.await?;
797-
798-
let users = resp
799-
.common_prefixes
800-
.iter()
801-
.flat_map(|path| path.parts())
802-
.filter(|name| name.as_ref() != USERS_ROOT_DIR)
803-
.map(|name| name.as_ref().to_string())
804-
.collect::<Vec<_>>();
805-
for user in users {
806-
let user_correlation_path =
807-
object_store::path::Path::from(format!("{USERS_ROOT_DIR}/{user}/correlations",));
808-
let correlations_path = RelativePathBuf::from(&user_correlation_path);
809-
let correlation_bytes = self
810-
.get_objects(
811-
Some(&correlations_path),
812-
Box::new(|file_name| file_name.ends_with(".json")),
813-
)
814-
.await?;
815-
816-
correlations
817-
.entry(correlations_path)
818-
.or_default()
819-
.extend(correlation_bytes);
820-
}
821-
Ok(correlations)
822-
}
823-
824786
fn get_bucket_name(&self) -> String {
825787
self.bucket.clone()
826788
}

0 commit comments

Comments
 (0)