Skip to content

Commit 3c3aa20

Browse files
committed
Add optional manifest_url parameter
1 parent 8b7746f commit 3c3aa20

File tree

7 files changed

+37
-21
lines changed

7 files changed

+37
-21
lines changed

src/catalog/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ async fn handle_existing_partition(
307307
stream_name,
308308
manifests[pos].time_lower_bound,
309309
manifests[pos].time_upper_bound,
310+
Some(manifests[pos].manifest_path.clone()),
310311
)
311312
.await
312313
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?

src/enterprise/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub async fn fetch_parquet_file_paths(
100100
stream,
101101
manifest_item.time_lower_bound,
102102
manifest_item.time_upper_bound,
103+
Some(manifest_item.manifest_path),
103104
)
104105
.await
105106
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?

src/metastore/metastore_traits.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
101101
stream_name: &str,
102102
lower_bound: DateTime<Utc>,
103103
upper_bound: DateTime<Utc>,
104+
manifest_url: Option<String>,
104105
) -> Result<Option<Manifest>, MetastoreError>;
105106
async fn put_manifest(
106107
&self,

src/metastore/metastores/object_store_metastore.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -414,9 +414,15 @@ impl Metastore for ObjectStoreMetastore {
414414
stream_name: &str,
415415
lower_bound: DateTime<Utc>,
416416
upper_bound: DateTime<Utc>,
417+
manifest_url: Option<String>,
417418
) -> Result<Option<Manifest>, MetastoreError> {
418-
let path = partition_path(stream_name, lower_bound, upper_bound);
419-
let path = manifest_path(path.as_str());
419+
let path = match manifest_url {
420+
Some(url) => RelativePathBuf::from(url),
421+
None => {
422+
let path = partition_path(stream_name, lower_bound, upper_bound);
423+
manifest_path(path.as_str())
424+
}
425+
};
420426
match self.storage.get_object(&path).await {
421427
Ok(bytes) => {
422428
let manifest = serde_json::from_slice(&bytes)?;
@@ -425,6 +431,18 @@ impl Metastore for ObjectStoreMetastore {
425431
Err(ObjectStorageError::NoSuchKey(_)) => Ok(None),
426432
Err(err) => Err(MetastoreError::ObjectStorageError(err)),
427433
}
434+
// let path = partition_path(stream_name, lower_bound, upper_bound);
435+
// // // need a 'ends with `manifest.json` condition here'
436+
// // let obs = self
437+
// // .storage
438+
// // .get_objects(
439+
// // path,
440+
// // Box::new(|file_name| file_name.ends_with("manifest.json")),
441+
// // )
442+
// // .await?;
443+
// warn!(partition_path=?path);
444+
// let path = manifest_path(path.as_str());
445+
// warn!(manifest_path=?path);
428446
}
429447

430448
/// Get the path for a specific `Manifest` file

src/metastore/mod.rs

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,25 +148,13 @@ impl MetastoreError {
148148

149149
pub fn status_code(&self) -> StatusCode {
150150
match self {
151-
MetastoreError::ObjectStorageError(_object_storage_error) => {
152-
StatusCode::INTERNAL_SERVER_ERROR
153-
}
154-
MetastoreError::JsonParseError(_error) => StatusCode::INTERNAL_SERVER_ERROR,
155-
MetastoreError::JsonSchemaError { message: _ } => StatusCode::INTERNAL_SERVER_ERROR,
156-
MetastoreError::InvalidJsonStructure {
157-
expected: _,
158-
found: _,
159-
} => StatusCode::INTERNAL_SERVER_ERROR,
160-
MetastoreError::MissingJsonField { field: _ } => StatusCode::INTERNAL_SERVER_ERROR,
161-
MetastoreError::InvalidJsonValue {
162-
field: _,
163-
reason: _,
164-
} => StatusCode::INTERNAL_SERVER_ERROR,
165-
MetastoreError::Error {
166-
status_code,
167-
message: _,
168-
flow: _,
169-
} => *status_code,
151+
MetastoreError::ObjectStorageError(..) => StatusCode::INTERNAL_SERVER_ERROR,
152+
MetastoreError::JsonParseError(..) => StatusCode::INTERNAL_SERVER_ERROR,
153+
MetastoreError::JsonSchemaError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
154+
MetastoreError::InvalidJsonStructure { .. } => StatusCode::INTERNAL_SERVER_ERROR,
155+
MetastoreError::MissingJsonField { .. } => StatusCode::INTERNAL_SERVER_ERROR,
156+
MetastoreError::InvalidJsonValue { .. } => StatusCode::INTERNAL_SERVER_ERROR,
157+
MetastoreError::Error { status_code, .. } => *status_code,
170158
}
171159
}
172160
}

src/query/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use std::ops::Bound;
4343
use std::sync::Arc;
4444
use sysinfo::System;
4545
use tokio::runtime::Runtime;
46+
use tracing::warn;
4647

4748
use self::error::ExecuteError;
4849
use self::stream_schema_provider::GlobalSchemaProvider;
@@ -546,15 +547,20 @@ pub async fn get_manifest_list(
546547
PartialTimeFilter::High(Bound::Included(time_range.end.naive_utc())),
547548
];
548549

550+
warn!(merged_snapshot=?merged_snapshot);
551+
warn!(time_filter=?time_filter);
552+
549553
let mut all_manifest_files = Vec::new();
550554
for manifest_item in merged_snapshot.manifests(&time_filter) {
555+
warn!(manifest_item=?manifest_item);
551556
all_manifest_files.push(
552557
PARSEABLE
553558
.metastore
554559
.get_manifest(
555560
stream_name,
556561
manifest_item.time_lower_bound,
557562
manifest_item.time_upper_bound,
563+
Some(manifest_item.manifest_path),
558564
)
559565
.await?
560566
.expect("Data is invalid for Manifest"),

src/query/stream_schema_provider.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ async fn collect_from_snapshot(
421421
stream_name,
422422
manifest_item.time_lower_bound,
423423
manifest_item.time_upper_bound,
424+
Some(manifest_item.manifest_path),
424425
)
425426
.await
426427
.map_err(|e| DataFusionError::Plan(e.to_string()))?

0 commit comments

Comments
 (0)