From 5eb626bdae47959219344b386981182e86000912 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 20 Aug 2025 06:22:49 +0530 Subject: [PATCH 1/5] Updates - changes related to time-partition - general updates --- src/alerts/alert_structs.rs | 7 +++++ .../http/modal/query/querier_logstream.rs | 29 ++++++++++++++----- src/query/mod.rs | 16 ++++++---- 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 6e2e0f275..9653bdd05 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -270,6 +270,13 @@ impl AlertRequest { TARGETS.get_target_by_id(id).await?; } let datasets = resolve_stream_names(&self.query)?; + + if datasets.len() != 1 { + return Err(AlertError::ValidationFailure(format!( + "Query should include only one dataset. Found- {datasets:?}" + ))); + } + let config = AlertConfig { version: AlertVersion::from(CURRENT_ALERTS_VERSION), id: Ulid::new(), diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index d9f512186..dd70ee008 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -33,14 +33,17 @@ use tracing::{error, warn}; static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); use crate::{ - handlers::http::{ - base_path_without_preceding_slash, - cluster::{ - self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors, - utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats}, + handlers::{ + UPDATE_STREAM_KEY, + http::{ + base_path_without_preceding_slash, + cluster::{ + self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors, + utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats}, + }, + logstream::error::StreamError, + modal::{NodeMetadata, NodeType}, }, - logstream::error::StreamError, - modal::{NodeMetadata, NodeType}, }, hottier::HotTierManager, parseable::{PARSEABLE, StreamNotFound}, @@ -120,9 +123,19 @@ pub async fn put_stream( .create_update_stream(req.headers(), &body, &stream_name) .await?; + let is_update = if let Some(val) = headers.get(UPDATE_STREAM_KEY) { + val.to_str().unwrap() == "true" + } else { + false + }; + sync_streams_with_ingestors(headers, body, &stream_name).await?; - Ok(("Log stream created", StatusCode::OK)) + if is_update { + Ok(("Log stream updated", StatusCode::OK)) + } else { + Ok(("Log stream created", StatusCode::OK)) + } } pub async fn get_stats( diff --git a/src/query/mod.rs b/src/query/mod.rs index 7dadcdc96..b3a6ed7a4 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -345,7 +345,7 @@ impl CountsRequest { .get_stream(&self.stream) .map_err(|err| anyhow::Error::msg(err.to_string()))? .get_time_partition() - .unwrap_or_else(|| event::DEFAULT_TIMESTAMP_KEY.to_owned()); + .unwrap_or(event::DEFAULT_TIMESTAMP_KEY.to_owned()); // get time range let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; @@ -445,6 +445,12 @@ impl CountsRequest { // unwrap because we have asserted that it is some let count_conditions = self.conditions.as_ref().unwrap(); + // get time partition column + let time_partition = PARSEABLE + .get_stream(&self.stream)? + .get_time_partition() + .unwrap_or(DEFAULT_TIMESTAMP_KEY.into()); + let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; let dur = time_range.end.signed_duration_since(time_range.start); @@ -452,19 +458,19 @@ impl CountsRequest { let date_bin = if dur.num_minutes() <= 60 * 10 { // date_bin 1 minute format!( - "CAST(DATE_BIN('1 minute', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time", + "CAST(DATE_BIN('1 minute', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time", self.stream ) } else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 { // date_bin 1 hour format!( - "CAST(DATE_BIN('1 hour', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time", + "CAST(DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time", self.stream ) } else { // date_bin 1 day format!( - "CAST(DATE_BIN('1 day', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time", + "CAST(DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time", self.stream ) }; @@ -653,7 +659,7 @@ fn table_contains_any_time_filters( }) .any(|expr| { matches!(&*expr.left, Expr::Column(Column { name, .. }) - if name == time_column) + if name == &default_timestamp || name == time_column) }) } From 4bdea13597bdb857782b93c4435be65724a5d572 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 20 Aug 2025 13:40:00 +0530 Subject: [PATCH 2/5] coderabbit changes --- src/alerts/alert_structs.rs | 2 +- src/handlers/http/cluster/utils.rs | 2 +- .../http/modal/query/querier_logstream.rs | 6 +++--- src/prism/logstream/mod.rs | 4 ++-- src/query/mod.rs | 18 +++++++++--------- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 9653bdd05..cccd11603 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -273,7 +273,7 @@ impl AlertRequest { if datasets.len() != 1 { return Err(AlertError::ValidationFailure(format!( - "Query should include only one dataset. Found- {datasets:?}" + "Query should include only one dataset. Found: {datasets:?}" ))); } diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index 103fed42c..8d1751f2f 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -135,7 +135,7 @@ impl StorageStats { } } -pub fn merge_quried_stats(stats: Vec) -> QueriedStats { +pub fn merge_queried_stats(stats: Vec) -> QueriedStats { // get the stream name let stream_name = stats[1].stream.clone(); diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index dd70ee008..b83da591d 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -39,7 +39,7 @@ use crate::{ base_path_without_preceding_slash, cluster::{ self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors, - utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats}, + utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats}, }, logstream::error::StreamError, modal::{NodeMetadata, NodeType}, @@ -118,7 +118,7 @@ pub async fn put_stream( body: Bytes, ) -> Result { let stream_name = stream_name.into_inner(); - let _ = CREATE_STREAM_LOCK.lock().await; + let _guard = CREATE_STREAM_LOCK.lock().await; let headers = PARSEABLE .create_update_stream(req.headers(), &body, &stream_name) .await?; @@ -231,7 +231,7 @@ pub async fn get_stats( let stats = if let Some(mut ingestor_stats) = ingestor_stats { ingestor_stats.push(stats); - merge_quried_stats(ingestor_stats) + merge_queried_stats(ingestor_stats) } else { stats }; diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 742cb0801..e5436905b 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -30,7 +30,7 @@ use crate::{ handlers::http::{ cluster::{ fetch_stats_from_ingestors, - utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats}, + utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats}, }, logstream::error::StreamError, query::{QueryError, update_schema_when_distributed}, @@ -136,7 +136,7 @@ async fn get_stats(stream_name: &str) -> Result 60 * 10 && dur.num_minutes() < 60 * 240 { // date_bin 1 hour format!( - "CAST(DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time", - self.stream + "CAST(DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time", + self.stream, self.stream ) } else { // date_bin 1 day format!( - "CAST(DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time", - self.stream + "CAST(DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time", + self.stream, self.stream ) }; From d87bfe4fe2bb045787ee170bfc916836612cf785 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 20 Aug 2025 19:01:24 +0530 Subject: [PATCH 3/5] updates - handled counts request --- src/handlers/http/cluster/mod.rs | 2 +- src/handlers/http/query.rs | 20 ++++-- src/prism/logstream/mod.rs | 110 ++++++++++++++++++++++++++----- src/query/mod.rs | 22 +++---- 4 files changed, 117 insertions(+), 37 deletions(-) diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 1e99cf62d..aac5d91f9 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -1114,7 +1114,7 @@ struct QuerierStatus { last_used: Option, } -async fn get_available_querier() -> Result { +pub async fn get_available_querier() -> Result { // Get all querier metadata let querier_metadata: Vec = get_node_info(NodeType::Querier).await?; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index e39a10bfe..82dd98753 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,6 +19,7 @@ use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use crate::option::Mode; +use crate::rbac::map::SessionKey; use crate::utils::arrow::record_batches_to_json; use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; @@ -43,7 +44,7 @@ use std::time::Instant; use tokio::task::JoinSet; use tracing::{error, warn}; -use crate::event::commit_schema; +use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema}; use crate::metrics::QUERY_EXECUTE_TIME; use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::query::error::ExecuteError; @@ -79,7 +80,7 @@ pub struct Query { /// TODO: Improve this function and make this a part of the query API pub async fn get_records_and_fields( query_request: &Query, - req: &HttpRequest, + creds: &SessionKey, ) -> Result<(Option>, Option>), QueryError> { let session_state = QUERY_SESSION.state(); let time_range = @@ -89,8 +90,8 @@ pub async fn get_records_and_fields( create_streams_for_distributed(tables.clone()).await?; let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?; - let creds = extract_session_key_from_req(req)?; - let permissions = Users.get_permissions(&creds); + + let permissions = Users.get_permissions(creds); user_auth_for_datasets(&permissions, &tables).await?; @@ -350,7 +351,12 @@ pub async fn get_counts( // if the user has given a sql query (counts call with filters applied), then use this flow // this could include filters or group by if body.conditions.is_some() { - let sql = body.get_df_sql().await?; + let time_partition = PARSEABLE + .get_stream(&body.stream)? + .get_time_partition() + .unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.into()); + + let sql = body.get_df_sql(time_partition).await?; let query_request = Query { query: sql, @@ -362,7 +368,9 @@ pub async fn get_counts( filter_tags: None, }; - let (records, _) = get_records_and_fields(&query_request, &req).await?; + let creds = extract_session_key_from_req(&req)?; + + let (records, _) = get_records_and_fields(&query_request, &creds).await?; if let Some(records) = records { let json_records = record_batches_to_json(&records)?; diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index e5436905b..a13bc272d 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -20,28 +20,35 @@ use std::sync::Arc; use actix_web::http::header::ContentType; use arrow_schema::Schema; -use chrono::Utc; +use chrono::{TimeDelta, Utc}; use http::StatusCode; +use itertools::Itertools; use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; use tracing::warn; use crate::{ LOCK_EXPECT, + alerts::alert_structs::{ConditionConfig, Conditions}, + event::DEFAULT_TIMESTAMP_KEY, handlers::http::{ cluster::{ fetch_stats_from_ingestors, utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats}, }, logstream::error::StreamError, - query::{QueryError, update_schema_when_distributed}, + query::{Query, QueryError, get_records_and_fields, update_schema_when_distributed}, }, hottier::{HotTierError, HotTierManager, StreamHotTier}, parseable::{PARSEABLE, StreamNotFound}, - query::{CountsRequest, CountsResponse, error::ExecuteError}, + query::{CountConditions, CountsRequest, CountsResponse, error::ExecuteError}, rbac::{Users, map::SessionKey, role::Action}, stats, storage::{StreamInfo, StreamType, retention::Retention}, - utils::time::TimeParseError, + utils::{ + arrow::record_batches_to_json, + time::{TimeParseError, truncate_to_minute}, + }, validator::error::HotTierValidationError, }; @@ -218,7 +225,7 @@ pub struct PrismDatasetResponse { /// Request parameters for retrieving Prism dataset information. /// Defines which streams to query -#[derive(Deserialize, Default)] +#[derive(Deserialize, Default, Serialize)] #[serde(rename_all = "camelCase")] pub struct PrismDatasetRequest { /// List of stream names to query @@ -292,7 +299,7 @@ impl PrismDatasetRequest { // Process stream data match get_prism_logstream_info(&stream).await { - Ok(info) => Ok(Some(self.build_dataset_response(stream, info).await?)), + Ok(info) => Ok(Some(self.build_dataset_response(stream, info, &key).await?)), Err(err) => Err(err), } } @@ -312,12 +319,13 @@ impl PrismDatasetRequest { &self, stream: String, info: PrismLogstreamInfo, + key: &SessionKey, ) -> Result { // Get hot tier info let hottier = self.get_hot_tier_info(&stream).await?; // Get counts - let counts = self.get_counts(&stream).await?; + let counts = self.get_counts(&stream, key).await?; Ok(PrismDatasetResponse { stream, @@ -346,20 +354,84 @@ impl PrismDatasetRequest { } } - async fn get_counts(&self, stream: &str) -> Result { + async fn get_counts( + &self, + stream: &str, + key: &SessionKey, + ) -> Result { + let end = truncate_to_minute(Utc::now()); + let start = end - TimeDelta::hours(1); + + let conditions = if PARSEABLE.get_stream(stream)?.get_time_partition().is_some() { + Some(CountConditions { + conditions: Some(Conditions { + operator: Some(crate::alerts::LogicalOperator::And), + condition_config: vec![ + ConditionConfig { + column: DEFAULT_TIMESTAMP_KEY.into(), + operator: crate::alerts::WhereConfigOperator::GreaterThanOrEqual, + value: Some(start.to_rfc3339()), + }, + ConditionConfig { + column: DEFAULT_TIMESTAMP_KEY.into(), + operator: crate::alerts::WhereConfigOperator::LessThan, + value: Some(end.to_rfc3339()), + }, + ], + }), + group_by: None, + }) + } else { + None + }; + let count_request = CountsRequest { stream: stream.to_owned(), - start_time: "1h".to_owned(), - end_time: "now".to_owned(), + start_time: start.to_rfc3339(), + end_time: end.to_rfc3339(), num_bins: 10, - conditions: None, + conditions, }; - let records = count_request.get_bin_density().await?; - Ok(CountsResponse { - fields: vec!["start_time".into(), "end_time".into(), "count".into()], - records, - }) + if count_request.conditions.is_some() { + // forward request to querier + let query = count_request + .get_df_sql(DEFAULT_TIMESTAMP_KEY.into()) + .await?; + + let query_request = Query { + query, + start_time: start.to_rfc3339(), + end_time: end.to_rfc3339(), + send_null: true, + fields: true, + streaming: false, + filter_tags: None, + }; + + let (records, _) = get_records_and_fields(&query_request, key).await?; + if let Some(records) = records { + let json_records = record_batches_to_json(&records)?; + let records = json_records.into_iter().map(Value::Object).collect_vec(); + + let res = json!({ + "fields": vec!["start_time", "end_time", "count"], + "records": records, + }); + + Ok(serde_json::from_value(res)?) + } else { + Err(PrismLogstreamError::Anyhow(anyhow::Error::msg( + "No data returned for counts SQL", + ))) + } + } else { + let records = count_request.get_bin_density().await?; + Ok(CountsResponse { + fields: vec!["start_time".into(), "end_time".into(), "count".into()], + records, + }) + } } } @@ -381,6 +453,10 @@ pub enum PrismLogstreamError { Execute(#[from] ExecuteError), #[error("Auth: {0}")] Auth(#[from] actix_web::Error), + #[error("SerdeError: {0}")] + SerdeError(#[from] serde_json::Error), + #[error("ReqwestError: {0}")] + ReqwestError(#[from] reqwest::Error), } impl actix_web::ResponseError for PrismLogstreamError { @@ -393,6 +469,8 @@ impl actix_web::ResponseError for PrismLogstreamError { PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR, PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND, PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::SerdeError(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::ReqwestError(_) => StatusCode::INTERNAL_SERVER_ERROR, PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED, } } diff --git a/src/query/mod.rs b/src/query/mod.rs index 521c2e6b9..6c142c6a6 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -441,16 +441,10 @@ impl CountsRequest { } /// This function will get executed only if self.conditions is some - pub async fn get_df_sql(&self) -> Result { + pub async fn get_df_sql(&self, time_column: String) -> Result { // unwrap because we have asserted that it is some let count_conditions = self.conditions.as_ref().unwrap(); - // get time partition column - let time_partition = PARSEABLE - .get_stream(&self.stream)? - .get_time_partition() - .unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.into()); - let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; let dur = time_range.end.signed_duration_since(time_range.start); @@ -458,20 +452,20 @@ impl CountsRequest { let date_bin = if dur.num_minutes() <= 60 * 10 { // date_bin 1 minute format!( - "CAST(DATE_BIN('1 minute', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time", - self.stream, self.stream + "CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time", + self.stream ) } else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 { // date_bin 1 hour format!( - "CAST(DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time", - self.stream, self.stream + "CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time", + self.stream ) } else { // date_bin 1 day format!( - "CAST(DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time", - self.stream, self.stream + "CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time", + self.stream ) }; @@ -492,7 +486,7 @@ impl CountsRequest { } /// Response for the counts API -#[derive(Debug, Serialize, Clone)] +#[derive(Debug, Serialize, Clone, Deserialize)] pub struct CountsResponse { /// Fields in the log stream pub fields: Vec, From daf94eecfb2c9fc4014ef3156330ee59a123f372 Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 21 Aug 2025 17:20:23 +0530 Subject: [PATCH 4/5] revert counts request changes --- src/prism/logstream/mod.rs | 102 ++++++------------------------------- 1 file changed, 15 insertions(+), 87 deletions(-) diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index a13bc272d..c1003adae 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -20,35 +20,28 @@ use std::sync::Arc; use actix_web::http::header::ContentType; use arrow_schema::Schema; -use chrono::{TimeDelta, Utc}; +use chrono::Utc; use http::StatusCode; -use itertools::Itertools; use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; use tracing::warn; use crate::{ LOCK_EXPECT, - alerts::alert_structs::{ConditionConfig, Conditions}, - event::DEFAULT_TIMESTAMP_KEY, handlers::http::{ cluster::{ fetch_stats_from_ingestors, utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats}, }, logstream::error::StreamError, - query::{Query, QueryError, get_records_and_fields, update_schema_when_distributed}, + query::{QueryError, update_schema_when_distributed}, }, hottier::{HotTierError, HotTierManager, StreamHotTier}, parseable::{PARSEABLE, StreamNotFound}, - query::{CountConditions, CountsRequest, CountsResponse, error::ExecuteError}, + query::{CountsRequest, CountsResponse, error::ExecuteError}, rbac::{Users, map::SessionKey, role::Action}, stats, storage::{StreamInfo, StreamType, retention::Retention}, - utils::{ - arrow::record_batches_to_json, - time::{TimeParseError, truncate_to_minute}, - }, + utils::time::TimeParseError, validator::error::HotTierValidationError, }; @@ -299,7 +292,7 @@ impl PrismDatasetRequest { // Process stream data match get_prism_logstream_info(&stream).await { - Ok(info) => Ok(Some(self.build_dataset_response(stream, info, &key).await?)), + Ok(info) => Ok(Some(self.build_dataset_response(stream, info).await?)), Err(err) => Err(err), } } @@ -319,13 +312,12 @@ impl PrismDatasetRequest { &self, stream: String, info: PrismLogstreamInfo, - key: &SessionKey, ) -> Result { // Get hot tier info let hottier = self.get_hot_tier_info(&stream).await?; // Get counts - let counts = self.get_counts(&stream, key).await?; + let counts = self.get_counts(&stream).await?; Ok(PrismDatasetResponse { stream, @@ -354,84 +346,20 @@ impl PrismDatasetRequest { } } - async fn get_counts( - &self, - stream: &str, - key: &SessionKey, - ) -> Result { - let end = truncate_to_minute(Utc::now()); - let start = end - TimeDelta::hours(1); - - let conditions = if PARSEABLE.get_stream(stream)?.get_time_partition().is_some() { - Some(CountConditions { - conditions: Some(Conditions { - operator: Some(crate::alerts::LogicalOperator::And), - condition_config: vec![ - ConditionConfig { - column: DEFAULT_TIMESTAMP_KEY.into(), - operator: crate::alerts::WhereConfigOperator::GreaterThanOrEqual, - value: Some(start.to_rfc3339()), - }, - ConditionConfig { - column: DEFAULT_TIMESTAMP_KEY.into(), - operator: crate::alerts::WhereConfigOperator::LessThan, - value: Some(end.to_rfc3339()), - }, - ], - }), - group_by: None, - }) - } else { - None - }; - + async fn get_counts(&self, stream: &str) -> Result { let count_request = CountsRequest { stream: stream.to_owned(), - start_time: start.to_rfc3339(), - end_time: end.to_rfc3339(), + start_time: "1h".to_owned(), + end_time: "now".to_owned(), num_bins: 10, - conditions, + conditions: None, }; - if count_request.conditions.is_some() { - // forward request to querier - let query = count_request - .get_df_sql(DEFAULT_TIMESTAMP_KEY.into()) - .await?; - - let query_request = Query { - query, - start_time: start.to_rfc3339(), - end_time: end.to_rfc3339(), - send_null: true, - fields: true, - streaming: false, - filter_tags: None, - }; - - let (records, _) = get_records_and_fields(&query_request, key).await?; - if let Some(records) = records { - let json_records = record_batches_to_json(&records)?; - let records = json_records.into_iter().map(Value::Object).collect_vec(); - - let res = json!({ - "fields": vec!["start_time", "end_time", "count"], - "records": records, - }); - - Ok(serde_json::from_value(res)?) - } else { - Err(PrismLogstreamError::Anyhow(anyhow::Error::msg( - "No data returned for counts SQL", - ))) - } - } else { - let records = count_request.get_bin_density().await?; - Ok(CountsResponse { - fields: vec!["start_time".into(), "end_time".into(), "count".into()], - records, - }) - } + let records = count_request.get_bin_density().await?; + Ok(CountsResponse { + fields: vec!["start_time".into(), "end_time".into(), "count".into()], + records, + }) } } From bb574b79c6da569bccb5210a9ea271478b79084f Mon Sep 17 00:00:00 2001 From: anant Date: Fri, 22 Aug 2025 08:49:25 +0530 Subject: [PATCH 5/5] coderabbit suggestions - added length check to `merge_queried_stats` --- src/handlers/http/cluster/utils.rs | 12 +++++++++--- src/handlers/http/modal/query/querier_logstream.rs | 1 + src/prism/logstream/mod.rs | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index 8d1751f2f..46e44b3d2 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -19,6 +19,7 @@ use crate::{ INTRA_CLUSTER_CLIENT, handlers::http::{base_path_without_preceding_slash, modal::NodeType}, + prism::logstream::PrismLogstreamError, }; use actix_web::http::header; use chrono::{DateTime, Utc}; @@ -135,7 +136,12 @@ impl StorageStats { } } -pub fn merge_queried_stats(stats: Vec) -> QueriedStats { +pub fn merge_queried_stats(stats: Vec) -> Result { + if stats.len() < 2 { + return Err(PrismLogstreamError::Anyhow(anyhow::Error::msg( + "Expected at least two logstreams in merge_queried_stats", + ))); + } // get the stream name let stream_name = stats[1].stream.clone(); @@ -167,12 +173,12 @@ pub fn merge_queried_stats(stats: Vec) -> QueriedStats { deleted_size: acc.deleted_size + x.deleted_size, }); - QueriedStats::new( + Ok(QueriedStats::new( &stream_name, min_time, cumulative_ingestion, cumulative_storage, - ) + )) } pub async fn check_liveness(domain_name: &str) -> bool { diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index b83da591d..049d4a933 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -232,6 +232,7 @@ pub async fn get_stats( let stats = if let Some(mut ingestor_stats) = ingestor_stats { ingestor_stats.push(stats); merge_queried_stats(ingestor_stats) + .map_err(|e| StreamError::Anyhow(anyhow::Error::msg(e.to_string())))? } else { stats }; diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index c1003adae..8703f71d4 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -136,7 +136,7 @@ async fn get_stats(stream_name: &str) -> Result