Skip to content

Commit 41029fd

Browse files
authored
Pre resolve prefixes for remote query (#552)
1 parent d116a0f commit 41029fd

File tree

4 files changed

+122
-11
lines changed

4 files changed

+122
-11
lines changed

server/src/query.rs

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,15 @@ use datafusion::execution::context::SessionState;
2929
use datafusion::execution::disk_manager::DiskManagerConfig;
3030
use datafusion::execution::runtime_env::RuntimeEnv;
3131
use datafusion::prelude::*;
32+
use futures_util::stream::FuturesUnordered;
33+
use futures_util::{future, Future, TryStreamExt};
3234
use itertools::Itertools;
35+
use object_store::path::Path as StorePath;
36+
use object_store::{ObjectMeta, ObjectStore};
3337
use serde_json::Value;
3438
use std::collections::HashMap;
3539
use std::path::{Path, PathBuf};
40+
use std::pin::Pin;
3641
use std::sync::Arc;
3742
use sysinfo::{System, SystemExt};
3843

@@ -76,16 +81,14 @@ impl Query {
7681
}
7782

7883
/// Return prefixes, each per day/hour/minutes as necessary
79-
fn _get_prefixes(&self) -> Vec<String> {
84+
fn generate_prefixes(&self) -> Vec<String> {
8085
TimePeriod::new(self.start, self.end, OBJECT_STORE_DATA_GRANULARITY).generate_prefixes()
8186
}
8287

83-
pub fn get_prefixes(&self) -> Vec<String> {
84-
self._get_prefixes()
88+
fn get_prefixes(&self) -> Vec<String> {
89+
self.generate_prefixes()
8590
.into_iter()
8691
.map(|key| format!("{}/{}", self.stream_name, key))
87-
// latest first
88-
.rev()
8992
.collect()
9093
}
9194

@@ -129,7 +132,15 @@ impl Query {
129132
storage: Arc<dyn ObjectStorage + Send>,
130133
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
131134
let ctx = self.create_session_context();
132-
let remote_listing_table = self._remote_query(storage)?;
135+
let unresolved_prefixes = self.get_prefixes();
136+
let client = ctx
137+
.runtime_env()
138+
.object_store(Box::new(storage.store_url()))
139+
.unwrap();
140+
let prefixes =
141+
resolve_paths(client, storage.normalize_prefixes(unresolved_prefixes)).await?;
142+
143+
let remote_listing_table = self.remote_query(prefixes, storage)?;
133144

134145
let current_minute = Utc::now()
135146
.with_second(0)
@@ -164,11 +175,12 @@ impl Query {
164175
Ok((results, fields))
165176
}
166177

167-
fn _remote_query(
178+
fn remote_query(
168179
&self,
180+
prefixes: Vec<String>,
169181
storage: Arc<dyn ObjectStorage + Send>,
170182
) -> Result<Option<Arc<ListingTable>>, ExecuteError> {
171-
let prefixes = storage.query_prefixes(self.get_prefixes());
183+
let prefixes = storage.query_prefixes(prefixes);
172184
if prefixes.is_empty() {
173185
return Ok(None);
174186
}
@@ -231,6 +243,83 @@ fn time_from_path(path: &Path) -> DateTime<Utc> {
231243
.unwrap()
232244
}
233245

246+
// accepts relative paths to resolve the narrative
247+
// returns list of prefixes sorted in descending order
248+
async fn resolve_paths(
249+
client: Arc<dyn ObjectStore>,
250+
prefixes: Vec<String>,
251+
) -> Result<Vec<String>, ObjectStorageError> {
252+
let mut minute_resolve: HashMap<String, Vec<String>> = HashMap::new();
253+
let mut all_resolve = Vec::new();
254+
255+
for prefix in prefixes {
256+
let components = prefix.split_terminator('/');
257+
if components.last().is_some_and(|x| x.starts_with("minute")) {
258+
let hour_prefix = &prefix[0..prefix.rfind("minute").expect("minute exists")];
259+
minute_resolve
260+
.entry(hour_prefix.to_owned())
261+
.and_modify(|list| list.push(prefix))
262+
.or_default();
263+
} else {
264+
all_resolve.push(prefix)
265+
}
266+
}
267+
268+
type ResolveFuture = Pin<Box<dyn Future<Output = Result<Vec<ObjectMeta>, ObjectStorageError>>>>;
269+
270+
let tasks: FuturesUnordered<ResolveFuture> = FuturesUnordered::new();
271+
272+
for (listing_prefix, prefix) in minute_resolve {
273+
let client = Arc::clone(&client);
274+
tasks.push(Box::pin(async move {
275+
let mut list = client
276+
.list(Some(&StorePath::from(listing_prefix)))
277+
.await?
278+
.try_collect::<Vec<_>>()
279+
.await?;
280+
281+
list.retain(|object| {
282+
prefix.iter().any(|prefix| {
283+
object
284+
.location
285+
.prefix_matches(&StorePath::from(prefix.as_ref()))
286+
})
287+
});
288+
289+
Ok(list)
290+
}));
291+
}
292+
293+
for prefix in all_resolve {
294+
let client = Arc::clone(&client);
295+
tasks.push(Box::pin(async move {
296+
client
297+
.list(Some(&StorePath::from(prefix)))
298+
.await?
299+
.try_collect::<Vec<_>>()
300+
.await
301+
.map_err(Into::into)
302+
}));
303+
}
304+
305+
let res: Vec<Vec<String>> = tasks
306+
.and_then(|res| {
307+
future::ok(
308+
res.into_iter()
309+
.map(|res| res.location.to_string())
310+
.collect_vec(),
311+
)
312+
})
313+
.try_collect()
314+
.await?;
315+
316+
let mut res = res.into_iter().flatten().collect_vec();
317+
res.sort();
318+
res.reverse();
319+
320+
Ok(res)
321+
}
322+
234323
pub mod error {
235324
use datafusion::error::DataFusionError;
236325

server/src/storage/localfs.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,15 +194,26 @@ impl ObjectStorage for LocalFS {
194194
Ok(())
195195
}
196196

197-
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
197+
fn normalize_prefixes(&self, prefixes: Vec<String>) -> Vec<String> {
198198
prefixes
199199
.into_iter()
200-
.filter_map(|prefix| {
200+
.map(|prefix| {
201201
let path = self.root.join(prefix);
202-
ListingTableUrl::parse(path.to_str().unwrap()).ok()
202+
format!("{}", path.display())
203203
})
204204
.collect()
205205
}
206+
207+
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
208+
prefixes
209+
.into_iter()
210+
.filter_map(|prefix| ListingTableUrl::parse(format!("/{}", prefix)).ok())
211+
.collect()
212+
}
213+
214+
fn store_url(&self) -> url::Url {
215+
url::Url::parse("file:///").unwrap()
216+
}
206217
}
207218

208219
async fn dir_with_stream(

server/src/storage/object_storage.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ pub trait ObjectStorage: Sync + 'static {
6767
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
6868
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
6969
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;
70+
fn normalize_prefixes(&self, prefixes: Vec<String>) -> Vec<String>;
7071
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl>;
72+
fn store_url(&self) -> url::Url;
7173

7274
async fn put_schema(
7375
&self,

server/src/storage/s3.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,11 @@ impl ObjectStorage for S3 {
444444
Ok(())
445445
}
446446

447+
// no op on s3
448+
fn normalize_prefixes(&self, prefixes: Vec<String>) -> Vec<String> {
449+
prefixes
450+
}
451+
447452
fn query_prefixes(&self, prefixes: Vec<String>) -> Vec<ListingTableUrl> {
448453
prefixes
449454
.into_iter()
@@ -453,6 +458,10 @@ impl ObjectStorage for S3 {
453458
})
454459
.collect()
455460
}
461+
462+
fn store_url(&self) -> url::Url {
463+
url::Url::parse(&format!("s3://{}", self.bucket)).unwrap()
464+
}
456465
}
457466

458467
impl From<object_store::Error> for ObjectStorageError {

0 commit comments

Comments
 (0)