Skip to content

Commit 36ef1aa

Browse files
committed
Changes for stream.json
1 parent f342e0b commit 36ef1aa

File tree

25 files changed

+388
-365
lines changed

25 files changed

+388
-365
lines changed

src/alerts/alert_structs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -531,11 +531,11 @@ pub struct NotificationStateRequest {
531531
}
532532

533533
impl MetastoreObject for AlertConfig {
534-
fn get_id(&self) -> String {
534+
fn get_object_id(&self) -> String {
535535
self.id.to_string()
536536
}
537537

538-
fn get_path(&self) -> String {
538+
fn get_object_path(&self) -> String {
539539
alert_json_path(self.id).to_string()
540540
}
541541
}

src/alerts/alert_traits.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::{
2222
alert_enums::NotificationState,
2323
alert_structs::{Context, ThresholdConfig},
2424
},
25+
metastore::metastore_traits::MetastoreObject,
2526
rbac::map::SessionKey,
2627
};
2728
use chrono::{DateTime, Utc};
@@ -47,7 +48,7 @@ pub trait MessageCreation {
4748
}
4849

4950
#[async_trait]
50-
pub trait AlertTrait: Debug + Send + Sync {
51+
pub trait AlertTrait: Debug + Send + Sync + MetastoreObject {
5152
async fn eval_alert(&self) -> Result<Option<String>, AlertError>;
5253
async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError>;
5354
async fn update_notification_state(

src/alerts/alert_types.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ use crate::{
3535
target::{self, NotificationConfig},
3636
},
3737
handlers::http::query::create_streams_for_distributed,
38+
metastore::metastore_traits::MetastoreObject,
3839
parseable::PARSEABLE,
3940
query::resolve_stream_names,
4041
rbac::map::SessionKey,
42+
storage::object_storage::alert_json_path,
4143
utils::user_auth_for_query,
4244
};
4345

@@ -65,6 +67,16 @@ pub struct ThresholdAlert {
6567
pub last_triggered_at: Option<DateTime<Utc>>,
6668
}
6769

70+
impl MetastoreObject for ThresholdAlert {
71+
fn get_object_path(&self) -> String {
72+
alert_json_path(self.id).to_string()
73+
}
74+
75+
fn get_object_id(&self) -> String {
76+
self.id.to_string()
77+
}
78+
}
79+
6880
#[async_trait]
6981
impl AlertTrait for ThresholdAlert {
7082
async fn eval_alert(&self) -> Result<Option<String>, AlertError> {

src/catalog/mod.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,13 @@ pub async fn update_snapshot(
114114
return Ok(());
115115
}
116116

117-
let mut meta = storage.get_object_store_format(stream_name).await?;
118-
117+
let mut meta: ObjectStoreFormat = serde_json::from_slice(
118+
&PARSEABLE
119+
.metastore
120+
.get_stream_json(stream_name, false)
121+
.await
122+
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?,
123+
)?;
119124
let partition_groups = group_changes_by_partition(changes, &meta.time_partition);
120125

121126
let new_manifest_entries =
@@ -458,7 +463,14 @@ pub async fn remove_manifest_from_snapshot(
458463
) -> Result<(), ObjectStorageError> {
459464
if !dates.is_empty() {
460465
// get current snapshot
461-
let mut meta = storage.get_object_store_format(stream_name).await?;
466+
let mut meta: ObjectStoreFormat = serde_json::from_slice(
467+
&PARSEABLE
468+
.metastore
469+
.get_stream_json(stream_name, false)
470+
.await
471+
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?,
472+
)?;
473+
462474
let meta_for_stats = meta.clone();
463475
update_deleted_stats(storage.clone(), stream_name, meta_for_stats, dates.clone()).await?;
464476
let manifests = &mut meta.snapshot.manifest_list;

src/enterprise/utils.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
event,
1616
parseable::PARSEABLE,
1717
query::{PartialTimeFilter, stream_schema_provider::ManifestExt},
18-
storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
18+
storage::{ObjectStorage, ObjectStorageError, ObjectStoreFormat},
1919
utils::time::TimeRange,
2020
};
2121

@@ -68,7 +68,13 @@ pub async fn fetch_parquet_file_paths(
6868
) -> Result<HashMap<RelativePathBuf, Vec<File>>, ObjectStorageError> {
6969
let glob_storage = PARSEABLE.storage.get_object_store();
7070

71-
let object_store_format = glob_storage.get_object_store_format(stream).await?;
71+
let object_store_format: ObjectStoreFormat = serde_json::from_slice(
72+
&PARSEABLE
73+
.metastore
74+
.get_stream_json(stream, false)
75+
.await
76+
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?,
77+
)?;
7278

7379
let time_partition = object_store_format.time_partition;
7480

@@ -78,13 +84,7 @@ pub async fn fetch_parquet_file_paths(
7884

7985
let mut merged_snapshot: snapshot::Snapshot = snapshot::Snapshot::default();
8086

81-
let path = RelativePathBuf::from_iter([stream, STREAM_ROOT_DIRECTORY]);
82-
let obs = glob_storage
83-
.get_objects(
84-
Some(&path),
85-
Box::new(|file_name| file_name.ends_with("stream.json")),
86-
)
87-
.await;
87+
let obs = PARSEABLE.metastore.get_all_stream_jsons(stream, None).await;
8888
if let Ok(obs) = obs {
8989
for ob in obs {
9090
if let Ok(object_store_format) = serde_json::from_slice::<ObjectStoreFormat>(&ob) {

src/handlers/http/alerts.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,7 @@ pub async fn delete(req: HttpRequest, alert_id: Path<Ulid>) -> Result<impl Respo
262262
// validate that the user has access to the tables mentioned in the query
263263
user_auth_for_query(&session_key, alert.get_query()).await?;
264264

265-
PARSEABLE
266-
.metastore
267-
.delete_object(alert_json_path(alert_id).as_ref())
268-
.await?;
265+
PARSEABLE.metastore.delete_alert(&*alert).await?;
269266

270267
// delete from memory
271268
alerts.delete(alert_id).await?;

src/handlers/http/cluster/mod.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ use crate::INTRA_CLUSTER_CLIENT;
4545
use crate::handlers::http::ingest::ingest_internal_stream;
4646
use crate::handlers::http::query::{Query, QueryError, TIME_ELAPSED_HEADER};
4747
use crate::metrics::prom_utils::Metrics;
48+
use crate::option::Mode;
4849
use crate::parseable::PARSEABLE;
4950
use crate::rbac::role::model::DefaultPrivilege;
5051
use crate::rbac::user::User;
5152
use crate::stats::Stats;
5253
use crate::storage::{
5354
ObjectStorage, ObjectStorageError, ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY,
54-
STREAM_ROOT_DIRECTORY,
5555
};
5656

5757
use super::base_path_without_preceding_slash;
@@ -497,16 +497,9 @@ pub fn fetch_daily_stats(
497497
pub async fn fetch_stats_from_ingestors(
498498
stream_name: &str,
499499
) -> Result<Vec<utils::QueriedStats>, StreamError> {
500-
let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]);
501500
let obs = PARSEABLE
502-
.storage
503-
.get_object_store()
504-
.get_objects(
505-
Some(&path),
506-
Box::new(|file_name| {
507-
file_name.starts_with(".ingestor") && file_name.ends_with("stream.json")
508-
}),
509-
)
501+
.metastore
502+
.get_all_stream_jsons(stream_name, Some(Mode::Ingest))
510503
.await?;
511504

512505
let mut ingestion_size = 0u64;

src/handlers/http/logstream.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::rbac::Users;
2828
use crate::rbac::role::Action;
2929
use crate::stats::{Stats, event_labels_date, storage_size_labels_date};
3030
use crate::storage::retention::Retention;
31-
use crate::storage::{StreamInfo, StreamType};
31+
use crate::storage::{ObjectStoreFormat, StreamInfo, StreamType};
3232
use crate::utils::actix::extract_session_key_from_req;
3333
use crate::utils::json::flatten::{
3434
self, convert_to_array, generic_flattening, has_more_than_max_allowed_levels,
@@ -413,7 +413,12 @@ pub async fn put_stream_hot_tier(
413413
.put_hot_tier(&stream_name, &mut hottier)
414414
.await?;
415415
let storage = PARSEABLE.storage().get_object_store();
416-
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
416+
let mut stream_metadata: ObjectStoreFormat = serde_json::from_slice(
417+
&PARSEABLE
418+
.metastore
419+
.get_stream_json(&stream_name, false)
420+
.await?,
421+
)?;
417422
stream_metadata.hot_tier_enabled = true;
418423
storage
419424
.put_stream_manifest(&stream_name, &stream_metadata)
@@ -491,6 +496,7 @@ pub mod error {
491496

492497
use crate::{
493498
hottier::HotTierError,
499+
metastore::MetastoreError,
494500
parseable::StreamNotFound,
495501
storage::ObjectStorageError,
496502
validator::error::{
@@ -563,6 +569,8 @@ pub mod error {
563569
HotTierError(#[from] HotTierError),
564570
#[error("Invalid query parameter: {0}")]
565571
InvalidQueryParameter(String),
572+
#[error("{0:?}")]
573+
MetastoreError(#[from] MetastoreError),
566574
}
567575

568576
impl actix_web::ResponseError for StreamError {
@@ -599,13 +607,21 @@ pub mod error {
599607
StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST,
600608
StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR,
601609
StreamError::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST,
610+
StreamError::MetastoreError(e) => e.status_code(),
602611
}
603612
}
604613

605614
fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
606-
actix_web::HttpResponse::build(self.status_code())
607-
.insert_header(ContentType::plaintext())
608-
.body(self.to_string())
615+
match self {
616+
StreamError::MetastoreError(metastore_error) => {
617+
actix_web::HttpResponse::build(metastore_error.status_code())
618+
.insert_header(ContentType::json())
619+
.json(metastore_error.to_detail())
620+
}
621+
_ => actix_web::HttpResponse::build(self.status_code())
622+
.insert_header(ContentType::plaintext())
623+
.body(self.to_string()),
624+
}
609625
}
610626
}
611627
}

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use actix_web::{
2626
use bytes::Bytes;
2727
use chrono::Utc;
2828
use http::StatusCode;
29-
use relative_path::RelativePathBuf;
3029
use tokio::sync::Mutex;
3130
use tracing::{error, warn};
3231

@@ -48,7 +47,7 @@ use crate::{
4847
hottier::HotTierManager,
4948
parseable::{PARSEABLE, StreamNotFound},
5049
stats,
51-
storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY, StreamType},
50+
storage::{ObjectStoreFormat, StreamType},
5251
};
5352
const STATS_DATE_QUERY_PARAM: &str = "date";
5453

@@ -165,14 +164,9 @@ pub async fn get_stats(
165164

166165
if !date_value.is_empty() {
167166
// this function requires all the ingestor stream jsons
168-
let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]);
169167
let obs = PARSEABLE
170-
.storage
171-
.get_object_store()
172-
.get_objects(
173-
Some(&path),
174-
Box::new(|file_name| file_name.ends_with("stream.json")),
175-
)
168+
.metastore
169+
.get_all_stream_jsons(&stream_name, None)
176170
.await?;
177171

178172
let mut stream_jsons = Vec::new();

src/handlers/http/query.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use crate::event::error::EventError;
2020
use crate::handlers::http::fetch_schema;
21+
use crate::metastore::MetastoreError;
2122
use crate::option::Mode;
2223
use crate::rbac::map::SessionKey;
2324
use crate::utils::arrow::record_batches_to_json;
@@ -578,12 +579,15 @@ Description: {0}"#
578579
NoAvailableQuerier,
579580
#[error("{0}")]
580581
ParserError(#[from] ParserError),
582+
#[error("{0:?}")]
583+
MetastoreError(#[from] MetastoreError),
581584
}
582585

583586
impl actix_web::ResponseError for QueryError {
584587
fn status_code(&self) -> http::StatusCode {
585588
match self {
586589
QueryError::Execute(_) | QueryError::JsonParse(_) => StatusCode::INTERNAL_SERVER_ERROR,
590+
QueryError::MetastoreError(e) => e.status_code(),
587591
_ => StatusCode::BAD_REQUEST,
588592
}
589593
}

0 commit comments

Comments
 (0)