Skip to content

Pre resolve prefixes for remote query #552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 97 additions & 8 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::prelude::*;
use futures_util::stream::FuturesUnordered;
use futures_util::{future, Future, TryStreamExt};
use itertools::Itertools;
use object_store::path::Path as StorePath;
use object_store::{ObjectMeta, ObjectStore};
use serde_json::Value;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use sysinfo::{System, SystemExt};

Expand Down Expand Up @@ -76,16 +81,14 @@ impl Query {
}

/// Return prefixes, each per day/hour/minutes as necessary
fn _get_prefixes(&self) -> Vec<String> {
fn generate_prefixes(&self) -> Vec<String> {
TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY).generate_prefixes()
}

pub fn get_prefixes(&self) -> Vec<String> {
self._get_prefixes()
fn get_prefixes(&self) -> Vec<String> {
self.generate_prefixes()
.into_iter()
.map(|key| format!("{}/{}", self.stream_name, key))
// latest first
.rev()
.collect()
}

Expand Down Expand Up @@ -129,7 +132,15 @@ impl Query {
storage: Arc<dyn ObjectStorage + Send>,
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let ctx = self.create_session_context();
let remote_listing_table = self._remote_query(storage)?;
let unresolved_prefixes = self.get_prefixes();
let client = ctx
.runtime_env()
.object_store(Box::new(storage.store_url()))
.unwrap();
let prefixes =
resolve_paths(client, storage.normalize_prefixes(unresolved_prefixes)).await?;

let remote_listing_table = self.remote_query(prefixes, storage)?;

let current_minute = Utc::now()
.with_second(0)
Expand Down Expand Up @@ -164,11 +175,12 @@ impl Query {
Ok((results, fields))
}

fn _remote_query(
fn remote_query(
&self,
prefixes: Vec<String>,
storage: Arc<dyn ObjectStorage + Send>,
) -> Result<Option<Arc<ListingTable>>, ExecuteError> {
let prefixes = storage.query_prefixes(self.get_prefixes());
let prefixes = storage.query_prefixes(prefixes);
if prefixes.is_empty() {
return Ok(None);
}
Expand Down Expand Up @@ -231,6 +243,83 @@ fn time_from_path(path: &Path) -> DateTime<Utc> {
.unwrap()
}

// accepts relative paths to resolve the narrative
// returns list of prefixes sorted in descending order
async fn resolve_paths(
client: Arc<dyn ObjectStore>,
prefixes: Vec<String>,
) -> Result<Vec<String>, ObjectStorageError> {
let mut minute_resolve: HashMap<String, Vec<String>> = HashMap::new();
let mut all_resolve = Vec::new();

for prefix in prefixes {
let components = prefix.split_terminator('/');
if components.last().is_some_and(|x| x.starts_with("minute")) {
let hour_prefix = &prefix[0..prefix.rfind("minute").expect("minute exists")];
minute_resolve
.entry(hour_prefix.to_owned())
.and_modify(|list| list.push(prefix))
.or_default();
} else {
all_resolve.push(prefix)
}
}

type ResolveFuture = Pin<Box<dyn Future<Output = Result<Vec<ObjectMeta>, ObjectStorageError>>>>;

let tasks: FuturesUnordered<ResolveFuture> = FuturesUnordered::new();

for (listing_prefix, prefix) in minute_resolve {
let client = Arc::clone(&client);
tasks.push(Box::pin(async move {
let mut list = client
.list(Some(&StorePath::from(listing_prefix)))
.await?
.try_collect::<Vec<_>>()
.await?;

list.retain(|object| {
prefix.iter().any(|prefix| {
object
.location
.prefix_matches(&StorePath::from(prefix.as_ref()))
})
});

Ok(list)
}));
}

for prefix in all_resolve {
let client = Arc::clone(&client);
tasks.push(Box::pin(async move {
client
.list(Some(&StorePath::from(prefix)))
.await?
.try_collect::<Vec<_>>()
.await
.map_err(Into::into)
}));
}

let res: Vec<Vec<String>> = tasks
.and_then(|res| {
future::ok(
res.into_iter()
.map(|res| res.location.to_string())
.collect_vec(),
)
})
.try_collect()
.await?;

let mut res = res.into_iter().flatten().collect_vec();
res.sort();
res.reverse();

Ok(res)
}

pub mod error {
use datafusion::error::DataFusionError;

Expand Down
17 changes: 14 additions & 3 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,26 @@ impl ObjectStorage for LocalFS {
Ok(())
}

fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
fn normalize_prefixes(&self, prefixes: Vec<String>) -> Vec<String> {
prefixes
.into_iter()
.filter_map(|prefix| {
.map(|prefix| {
let path = self.root.join(prefix);
ListingTableUrl::parse(path.to_str().unwrap()).ok()
format!("{}", path.display())
})
.collect()
}

fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
prefixes
.into_iter()
.filter_map(|prefix| ListingTableUrl::parse(format!("/{}", prefix)).ok())
.collect()
}

fn store_url(&self) -> url::Url {
url::Url::parse("file:///").unwrap()
}
}

async fn dir_with_stream(
Expand Down
2 changes: 2 additions & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ pub trait ObjectStorage: Sync + 'static {
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
fn normalize_prefixes(&self, prefixes: Vec<String>) -> Vec<String>;
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl>;
fn store_url(&self) -> url::Url;

async fn put_schema(
&self,
Expand Down
9 changes: 9 additions & 0 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,11 @@ impl ObjectStorage for S3 {
Ok(())
}

// no op on s3
fn normalize_prefixes(&self, prefixes: Vec<String>) -> Vec<String> {
prefixes
}

fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
prefixes
.into_iter()
Expand All @@ -453,6 +458,10 @@ impl ObjectStorage for S3 {
})
.collect()
}

fn store_url(&self) -> url::Url {
url::Url::parse(&format!("s3://{}", self.bucket)).unwrap()
}
}

impl From<object_store::Error> for ObjectStorageError {
Expand Down