Skip to content

Commit ad1e884

Browse files
committed
Add optional manifest_url parameter
1 parent 7e9290f commit ad1e884

File tree

8 files changed

+66
-48
lines changed

8 files changed

+66
-48
lines changed

src/catalog/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ async fn handle_existing_partition(
307307
stream_name,
308308
manifests[pos].time_lower_bound,
309309
manifests[pos].time_upper_bound,
310+
Some(manifests[pos].manifest_path.clone()),
310311
)
311312
.await
312313
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?

src/enterprise/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub async fn fetch_parquet_file_paths(
100100
stream,
101101
manifest_item.time_lower_bound,
102102
manifest_item.time_upper_bound,
103+
Some(manifest_item.manifest_path),
103104
)
104105
.await
105106
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?

src/handlers/http/ingest.rs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -482,34 +482,36 @@ pub enum PostError {
482482

483483
impl actix_web::ResponseError for PostError {
484484
fn status_code(&self) -> http::StatusCode {
485+
use PostError::*;
485486
match self {
486-
PostError::SerdeError(_) => StatusCode::BAD_REQUEST,
487-
PostError::Header(_) => StatusCode::BAD_REQUEST,
488-
PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR,
489-
PostError::Invalid(_) => StatusCode::BAD_REQUEST,
490-
PostError::CreateStream(CreateStreamError::StreamNameValidation(_)) => {
491-
StatusCode::BAD_REQUEST
492-
}
493-
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
494-
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
495-
PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR,
496-
PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR,
497-
PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
498-
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
499-
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
500-
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
501-
PostError::JsonFlattenError(_) => StatusCode::INTERNAL_SERVER_ERROR,
502-
PostError::OtelNotSupported => StatusCode::BAD_REQUEST,
503-
PostError::InternalStream(_) => StatusCode::BAD_REQUEST,
504-
PostError::IncorrectLogSource(_) => StatusCode::BAD_REQUEST,
505-
PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST,
506-
PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
507-
PostError::KnownFormat(_) => StatusCode::BAD_REQUEST,
508-
PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST,
509-
PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST,
510-
PostError::InvalidQueryParameter => StatusCode::BAD_REQUEST,
511-
PostError::MissingQueryParameter => StatusCode::BAD_REQUEST,
512-
PostError::MetastoreError(e) => e.status_code(),
487+
SerdeError(_)
488+
| Header(_)
489+
| Invalid(_)
490+
| InternalStream(_)
491+
| IncorrectLogSource(_)
492+
| IngestionNotAllowed
493+
| MissingTimePartition(_)
494+
| KnownFormat(_)
495+
| IncorrectLogFormat(_)
496+
| FieldsCountLimitExceeded(_, _, _)
497+
| InvalidQueryParameter
498+
| MissingQueryParameter
499+
| CreateStream(CreateStreamError::StreamNameValidation(_))
500+
| OtelNotSupported => StatusCode::BAD_REQUEST,
501+
502+
Event(_)
503+
| CreateStream(_)
504+
| CustomError(_)
505+
| NetworkError(_)
506+
| ObjectStorageError(_)
507+
| DashboardError(_)
508+
| FiltersError(_)
509+
| StreamError(_)
510+
| JsonFlattenError(_) => StatusCode::INTERNAL_SERVER_ERROR,
511+
512+
StreamNotFound(_) => StatusCode::NOT_FOUND,
513+
514+
MetastoreError(e) => e.status_code(),
513515
}
514516
}
515517

src/metastore/metastore_traits.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
101101
stream_name: &str,
102102
lower_bound: DateTime<Utc>,
103103
upper_bound: DateTime<Utc>,
104+
manifest_url: Option<String>,
104105
) -> Result<Option<Manifest>, MetastoreError>;
105106
async fn put_manifest(
106107
&self,

src/metastore/metastores/object_store_metastore.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -414,9 +414,15 @@ impl Metastore for ObjectStoreMetastore {
414414
stream_name: &str,
415415
lower_bound: DateTime<Utc>,
416416
upper_bound: DateTime<Utc>,
417+
manifest_url: Option<String>,
417418
) -> Result<Option<Manifest>, MetastoreError> {
418-
let path = partition_path(stream_name, lower_bound, upper_bound);
419-
let path = manifest_path(path.as_str());
419+
let path = match manifest_url {
420+
Some(url) => RelativePathBuf::from(url),
421+
None => {
422+
let path = partition_path(stream_name, lower_bound, upper_bound);
423+
manifest_path(path.as_str())
424+
}
425+
};
420426
match self.storage.get_object(&path).await {
421427
Ok(bytes) => {
422428
let manifest = serde_json::from_slice(&bytes)?;
@@ -425,6 +431,18 @@ impl Metastore for ObjectStoreMetastore {
425431
Err(ObjectStorageError::NoSuchKey(_)) => Ok(None),
426432
Err(err) => Err(MetastoreError::ObjectStorageError(err)),
427433
}
434+
// let path = partition_path(stream_name, lower_bound, upper_bound);
435+
// // // need a 'ends with `manifest.json` condition here'
436+
// // let obs = self
437+
// // .storage
438+
// // .get_objects(
439+
// // path,
440+
// // Box::new(|file_name| file_name.ends_with("manifest.json")),
441+
// // )
442+
// // .await?;
443+
// warn!(partition_path=?path);
444+
// let path = manifest_path(path.as_str());
445+
// warn!(manifest_path=?path);
428446
}
429447

430448
/// Get the path for a specific `Manifest` file

src/metastore/mod.rs

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,25 +148,13 @@ impl MetastoreError {
148148

149149
pub fn status_code(&self) -> StatusCode {
150150
match self {
151-
MetastoreError::ObjectStorageError(_object_storage_error) => {
152-
StatusCode::INTERNAL_SERVER_ERROR
153-
}
154-
MetastoreError::JsonParseError(_error) => StatusCode::INTERNAL_SERVER_ERROR,
155-
MetastoreError::JsonSchemaError { message: _ } => StatusCode::INTERNAL_SERVER_ERROR,
156-
MetastoreError::InvalidJsonStructure {
157-
expected: _,
158-
found: _,
159-
} => StatusCode::INTERNAL_SERVER_ERROR,
160-
MetastoreError::MissingJsonField { field: _ } => StatusCode::INTERNAL_SERVER_ERROR,
161-
MetastoreError::InvalidJsonValue {
162-
field: _,
163-
reason: _,
164-
} => StatusCode::INTERNAL_SERVER_ERROR,
165-
MetastoreError::Error {
166-
status_code,
167-
message: _,
168-
flow: _,
169-
} => *status_code,
151+
MetastoreError::ObjectStorageError(..) => StatusCode::INTERNAL_SERVER_ERROR,
152+
MetastoreError::JsonParseError(..) => StatusCode::INTERNAL_SERVER_ERROR,
153+
MetastoreError::JsonSchemaError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
154+
MetastoreError::InvalidJsonStructure { .. } => StatusCode::INTERNAL_SERVER_ERROR,
155+
MetastoreError::MissingJsonField { .. } => StatusCode::INTERNAL_SERVER_ERROR,
156+
MetastoreError::InvalidJsonValue { .. } => StatusCode::INTERNAL_SERVER_ERROR,
157+
MetastoreError::Error { status_code, .. } => *status_code,
170158
}
171159
}
172160
}

src/query/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use std::ops::Bound;
4343
use std::sync::Arc;
4444
use sysinfo::System;
4545
use tokio::runtime::Runtime;
46+
use tracing::warn;
4647

4748
use self::error::ExecuteError;
4849
use self::stream_schema_provider::GlobalSchemaProvider;
@@ -546,15 +547,20 @@ pub async fn get_manifest_list(
546547
PartialTimeFilter::High(Bound::Included(time_range.end.naive_utc())),
547548
];
548549

550+
warn!(merged_snapshot=?merged_snapshot);
551+
warn!(time_filter=?time_filter);
552+
549553
let mut all_manifest_files = Vec::new();
550554
for manifest_item in merged_snapshot.manifests(&time_filter) {
555+
warn!(manifest_item=?manifest_item);
551556
all_manifest_files.push(
552557
PARSEABLE
553558
.metastore
554559
.get_manifest(
555560
stream_name,
556561
manifest_item.time_lower_bound,
557562
manifest_item.time_upper_bound,
563+
Some(manifest_item.manifest_path),
558564
)
559565
.await?
560566
.expect("Data is invalid for Manifest"),

src/query/stream_schema_provider.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ async fn collect_from_snapshot(
421421
stream_name,
422422
manifest_item.time_lower_bound,
423423
manifest_item.time_upper_bound,
424+
Some(manifest_item.manifest_path),
424425
)
425426
.await
426427
.map_err(|e| DataFusionError::Plan(e.to_string()))?

0 commit comments

Comments
 (0)