From 536782038946b6ec19283e38c8e1dcd9da8d6b8e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Mar 2025 16:47:10 +0530 Subject: [PATCH 01/10] feat: prism post datasets API --- src/handlers/http/modal/query_server.rs | 3 +- src/handlers/http/modal/server.rs | 14 +- src/handlers/http/prism_logstream.rs | 11 +- src/prism/logstream/mod.rs | 178 +++++++++++++++++++++++- src/response.rs | 3 +- src/utils/arrow/mod.rs | 8 +- 6 files changed, 206 insertions(+), 11 deletions(-) diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c51d98bd4..161bd88f5 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -76,7 +76,8 @@ impl ParseableServer for QueryServer { .service( web::scope(&prism_base_path()) .service(Server::get_prism_home()) - .service(Server::get_prism_logstream()), + .service(Server::get_prism_logstream()) + .service(Server::get_prism_datasets()), ) .service(Server::get_generated()); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 38d0e0239..d28b02b59 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -95,7 +95,8 @@ impl ParseableServer for Server { .service( web::scope(&prism_base_path()) .service(Server::get_prism_home()) - .service(Server::get_prism_logstream()), + .service(Server::get_prism_logstream()) + .service(Server::get_prism_datasets()), ) .service(Self::get_ingest_otel_factory()) .service(Self::get_generated()); @@ -180,6 +181,17 @@ impl Server { ) } + pub fn get_prism_datasets() -> Scope { + web::scope("/datasets").route( + "", + web::post() + .to(http::prism_logstream::post_datasets) + .authorize_for_stream(Action::GetStreamInfo) + .authorize_for_stream(Action::GetStats) + .authorize_for_stream(Action::GetRetention), + ) + } + pub fn get_metrics_webscope() -> Scope { web::scope("/metrics").service( web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)), diff --git a/src/handlers/http/prism_logstream.rs b/src/handlers/http/prism_logstream.rs index 2be76a094..4e434afcf 100644 --- a/src/handlers/http/prism_logstream.rs +++ b/src/handlers/http/prism_logstream.rs @@ -17,11 +17,11 @@ */ use actix_web::{ - web::{self, Path}, + web::{self, Json, Path}, Responder, }; -use crate::prism::logstream::{get_prism_logstream_info, PrismLogstreamError}; +use crate::prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError}; /// This API is essentially just combining the responses of /info and /schema together pub async fn get_info(stream_name: Path) -> Result { @@ -29,3 +29,10 @@ pub async fn get_info(stream_name: Path) -> Result) -> Result { + let dataset = req.get_datasets().await?; + + Ok(web::Json(dataset)) +} \ No newline at end of file diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index e9ffcb981..9e40c7cb9 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -22,7 +22,9 @@ use actix_web::http::header::ContentType; use arrow_schema::Schema; use chrono::Utc; use http::StatusCode; -use serde::Serialize; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tracing::debug; use crate::{ handlers::http::{ @@ -31,11 +33,17 @@ use crate::{ utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, logstream::error::StreamError, - query::update_schema_when_distributed, + query::{into_query, update_schema_when_distributed, Query, QueryError}, }, + hottier::{HotTierError, HotTierManager, StreamHotTier}, parseable::{StreamNotFound, PARSEABLE}, + query::{error::ExecuteError, execute, CountsRequest, CountsResponse, QUERY_SESSION}, stats, storage::{retention::Retention, StreamInfo, StreamType}, + utils::{ + arrow::record_batches_to_json, + time::{TimeParseError, TimeRange}, + }, LOCK_EXPECT, }; @@ -185,6 +193,157 @@ async fn get_stream_info_helper(stream_name: &str) -> Result, + /// Count of records in the specified time range + counts: CountsResponse, + /// Collection of distinct values for source identifiers + distinct_sources: Value, +} + +/// Request parameters for retrieving Prism dataset information. +/// Defines which streams to query and the time range for the query. +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PrismDatasetRequest { + /// List of stream names to query + #[serde(default)] + streams: Vec, + /// ISO 8601 formatted start time or human-readable time expression + start_time: String, + /// ISO 8601 formatted end time or human-readable time expression + end_time: String, +} + +impl PrismDatasetRequest { + /// Retrieves dataset information for all specified streams. + /// + /// Processes each stream in the request and compiles their information. + /// Streams that don't exist or can't be accessed are skipped. + /// + /// # Returns + /// - `Ok(Vec)`: List of responses for successfully processed streams + /// - `Err(PrismLogstreamError)`: If a critical error occurs during processing + /// + /// # Note + /// This method won't fail if individual streams fail - it will only include + /// successfully processed streams in the result. + pub async fn get_datasets(self) -> Result, PrismLogstreamError> { + if self.streams.is_empty() { + return Err(PrismLogstreamError::Empty); + } + + let mut responses = vec![]; + for stream in self.streams.iter() { + if PARSEABLE.check_or_load_stream(&stream).await { + debug!("Stream not found: {stream}"); + continue; + } + + let PrismLogstreamInfo { + info, + stats, + retention, + .. + } = get_prism_logstream_info(&stream).await?; + + let hottier = match HotTierManager::global() { + Some(hot_tier_manager) => { + let stats = hot_tier_manager.get_hot_tier(&stream).await?; + Some(stats) + } + _ => None, + }; + let records = CountsRequest { + stream: stream.clone(), + start_time: self.start_time.clone(), + end_time: self.end_time.clone(), + num_bins: 1, + } + .get_bin_density() + .await?; + let counts = CountsResponse { + fields: vec!["start_time".into(), "end_time".into(), "count".into()], + records, + }; + + // Retrieve distinct values for source identifiers + // Returns None if fields aren't present or if query fails + let ips = self.get_distinct_entries(&stream, "p_src_ip").await.ok(); + let user_agents = self + .get_distinct_entries(&stream, "p_user_agent") + .await + .ok(); + + responses.push(PrismDatasetResponse { + info, + stats, + retention, + hottier, + counts, + distinct_sources: json!({ + "ips": ips, + "user_agents": user_agents + }), + }) + } + + Ok(responses) + } + + /// Retrieves distinct values for a specific field in a stream. + /// + /// # Parameters + /// - `stream_name`: Name of the stream to query + /// - `field`: Field name to get distinct values for + /// + /// # Returns + /// - `Ok(Vec)`: List of distinct values found for the field + /// - `Err(QueryError)`: If the query fails or field doesn't exist + async fn get_distinct_entries( + &self, + stream_name: &str, + field: &str, + ) -> Result, QueryError> { + let query = Query { + query: format!("SELECT DISTINCT({field}) FOR {stream_name}"), + start_time: self.start_time.clone(), + end_time: self.end_time.clone(), + send_null: false, + filter_tags: None, + fields: true, + }; + let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; + + let session_state = QUERY_SESSION.state(); + let query = into_query(&query, &session_state, time_range).await?; + let (records, _) = execute(query, stream_name).await?; + let response = record_batches_to_json(&records)?; + // Extract field values from the JSON response + let values = response + .iter() + .flat_map(|row| { + row.get(field) + .and_then(|s| s.as_str()) + .map(|s| s.to_string()) + }) + .collect(); + + Ok(values) + } +} + #[derive(Debug, thiserror::Error)] pub enum PrismLogstreamError { #[error("Error: {0}")] @@ -193,6 +352,16 @@ pub enum PrismLogstreamError { StreamError(#[from] StreamError), #[error("StreamNotFound: {0}")] StreamNotFound(#[from] StreamNotFound), + #[error("Hottier: {0}")] + Hottier(#[from] HotTierError), + #[error("Query: {0}")] + Query(#[from] QueryError), + #[error("TimeParse: {0}")] + TimeParse(#[from] TimeParseError), + #[error("Execute: {0}")] + Execute(#[from] ExecuteError), + #[error("Empty Stream List")] + Empty, } impl actix_web::ResponseError for PrismLogstreamError { @@ -201,6 +370,11 @@ impl actix_web::ResponseError for PrismLogstreamError { PrismLogstreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, PrismLogstreamError::StreamError(e) => e.status_code(), PrismLogstreamError::StreamNotFound(_) => StatusCode::NOT_FOUND, + PrismLogstreamError::Hottier(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND, + PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::Empty => StatusCode::BAD_REQUEST, } } diff --git a/src/response.rs b/src/response.rs index 8211bf4c7..fe3159f71 100644 --- a/src/response.rs +++ b/src/response.rs @@ -33,8 +33,7 @@ pub struct QueryResponse { impl QueryResponse { pub fn to_http(&self) -> Result { info!("{}", "Returning query results"); - let records: Vec<&RecordBatch> = self.records.iter().collect(); - let mut json_records = record_batches_to_json(&records)?; + let mut json_records = record_batches_to_json(&self.records)?; if self.fill_null { for map in &mut json_records { diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index 53e6437d6..e29762047 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -90,10 +90,12 @@ pub fn replace_columns( /// * Result>> /// /// A vector of JSON objects representing the record batches. -pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result>> { +pub fn record_batches_to_json(records: &[RecordBatch]) -> Result>> { let buf = vec![]; let mut writer = arrow_json::ArrayWriter::new(buf); - writer.write_batches(records)?; + for record in records { + writer.write(record)?; + } writer.finish()?; let buf = writer.into_inner(); @@ -188,7 +190,7 @@ mod tests { #[test] fn check_empty_json_to_record_batches() { let r = RecordBatch::new_empty(Arc::new(Schema::empty())); - let rb = vec![&r]; + let rb = vec![r]; let batches = record_batches_to_json(&rb).unwrap(); assert_eq!(batches, vec![]); } From 97f98d4ee4995040782c44233466a6775f40e6d7 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Mar 2025 17:04:01 +0530 Subject: [PATCH 02/10] fix: block unauthorized access --- src/prism/logstream/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 9e40c7cb9..d83f81c91 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -246,6 +246,13 @@ impl PrismDatasetRequest { let mut responses = vec![]; for stream in self.streams.iter() { + if Users.authorize(key.clone(), Action::ListStream, Some(stream), None) + != crate::rbac::Response::Authorized + { + warn!("Unauthorized access requested for stream: {stream}"); + continue; + } + if PARSEABLE.check_or_load_stream(&stream).await { debug!("Stream not found: {stream}"); continue; From 427316de5ad1b0f7215732534bfe571d5bdcd009 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Mar 2025 17:04:21 +0530 Subject: [PATCH 03/10] fix: list all streams when none are provided --- src/handlers/http/prism_logstream.rs | 10 +++++----- src/prism/logstream/mod.rs | 21 +++++++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/handlers/http/prism_logstream.rs b/src/handlers/http/prism_logstream.rs index 4e434afcf..181884a7e 100644 --- a/src/handlers/http/prism_logstream.rs +++ b/src/handlers/http/prism_logstream.rs @@ -17,11 +17,10 @@ */ use actix_web::{ - web::{self, Json, Path}, - Responder, + web::{self, Json, Path}, HttpRequest, Responder }; -use crate::prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError}; +use crate::{prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError}, utils::actix::extract_session_key_from_req}; /// This API is essentially just combining the responses of /info and /schema together pub async fn get_info(stream_name: Path) -> Result { @@ -31,8 +30,9 @@ pub async fn get_info(stream_name: Path) -> Result) -> Result { - let dataset = req.get_datasets().await?; +pub async fn post_datasets(Json(prism_req): Json, req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let dataset = prism_req.get_datasets(session_key).await?; Ok(web::Json(dataset)) } \ No newline at end of file diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index d83f81c91..325701527 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -24,7 +24,7 @@ use chrono::Utc; use http::StatusCode; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use tracing::debug; +use tracing::{debug, warn}; use crate::{ handlers::http::{ @@ -38,6 +38,7 @@ use crate::{ hottier::{HotTierError, HotTierManager, StreamHotTier}, parseable::{StreamNotFound, PARSEABLE}, query::{error::ExecuteError, execute, CountsRequest, CountsResponse, QUERY_SESSION}, + rbac::{map::SessionKey, role::Action, Users}, stats, storage::{retention::Retention, StreamInfo, StreamType}, utils::{ @@ -237,11 +238,15 @@ impl PrismDatasetRequest { /// - `Err(PrismLogstreamError)`: If a critical error occurs during processing /// /// # Note - /// This method won't fail if individual streams fail - it will only include + /// 1. This method won't fail if individual streams fail - it will only include /// successfully processed streams in the result. - pub async fn get_datasets(self) -> Result, PrismLogstreamError> { + /// 2. On receiving an empty stream list, we return for all streams the user is able to query for + pub async fn get_datasets( + mut self, + key: SessionKey, + ) -> Result, PrismLogstreamError> { if self.streams.is_empty() { - return Err(PrismLogstreamError::Empty); + self.streams = PARSEABLE.streams.list(); } let mut responses = vec![]; @@ -284,7 +289,7 @@ impl PrismDatasetRequest { fields: vec!["start_time".into(), "end_time".into(), "count".into()], records, }; - + // Retrieve distinct values for source identifiers // Returns None if fields aren't present or if query fails let ips = self.get_distinct_entries(&stream, "p_src_ip").await.ok(); @@ -367,8 +372,8 @@ pub enum PrismLogstreamError { TimeParse(#[from] TimeParseError), #[error("Execute: {0}")] Execute(#[from] ExecuteError), - #[error("Empty Stream List")] - Empty, + #[error("Auth: {0}")] + Auth(#[from] actix_web::Error), } impl actix_web::ResponseError for PrismLogstreamError { @@ -381,7 +386,7 @@ impl actix_web::ResponseError for PrismLogstreamError { PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR, PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND, PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR, - PrismLogstreamError::Empty => StatusCode::BAD_REQUEST, + PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED, } } From 90df5929d3a460434a8dd40f27643e17102a61d1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Mar 2025 17:11:59 +0530 Subject: [PATCH 04/10] style: cargo fmt --- src/handlers/http/prism_logstream.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/handlers/http/prism_logstream.rs b/src/handlers/http/prism_logstream.rs index 181884a7e..1eb6cdcf3 100644 --- a/src/handlers/http/prism_logstream.rs +++ b/src/handlers/http/prism_logstream.rs @@ -17,10 +17,14 @@ */ use actix_web::{ - web::{self, Json, Path}, HttpRequest, Responder + web::{self, Json, Path}, + HttpRequest, Responder, }; -use crate::{prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError}, utils::actix::extract_session_key_from_req}; +use crate::{ + prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError}, + utils::actix::extract_session_key_from_req, +}; /// This API is essentially just combining the responses of /info and /schema together pub async fn get_info(stream_name: Path) -> Result { @@ -30,9 +34,12 @@ pub async fn get_info(stream_name: Path) -> Result, req: HttpRequest) -> Result { +pub async fn post_datasets( + Json(prism_req): Json, + req: HttpRequest, +) -> Result { let session_key = extract_session_key_from_req(&req)?; - let dataset = prism_req.get_datasets(session_key).await?; + let dataset = prism_req.get_datasets(session_key).await?; Ok(web::Json(dataset)) -} \ No newline at end of file +} From 9680b52fc01774aeb0dc222b7fecba43f1b93f32 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Mar 2025 17:15:30 +0530 Subject: [PATCH 05/10] style: clippy suggestions --- src/prism/logstream/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 325701527..9cc96ae6c 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -239,7 +239,7 @@ impl PrismDatasetRequest { /// /// # Note /// 1. This method won't fail if individual streams fail - it will only include - /// successfully processed streams in the result. + /// successfully processed streams in the result. /// 2. On receiving an empty stream list, we return for all streams the user is able to query for pub async fn get_datasets( mut self, @@ -258,7 +258,7 @@ impl PrismDatasetRequest { continue; } - if PARSEABLE.check_or_load_stream(&stream).await { + if PARSEABLE.check_or_load_stream(stream).await { debug!("Stream not found: {stream}"); continue; } @@ -268,11 +268,11 @@ impl PrismDatasetRequest { stats, retention, .. - } = get_prism_logstream_info(&stream).await?; + } = get_prism_logstream_info(stream).await?; let hottier = match HotTierManager::global() { Some(hot_tier_manager) => { - let stats = hot_tier_manager.get_hot_tier(&stream).await?; + let stats = hot_tier_manager.get_hot_tier(stream).await?; Some(stats) } _ => None, @@ -292,9 +292,9 @@ impl PrismDatasetRequest { // Retrieve distinct values for source identifiers // Returns None if fields aren't present or if query fails - let ips = self.get_distinct_entries(&stream, "p_src_ip").await.ok(); + let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok(); let user_agents = self - .get_distinct_entries(&stream, "p_user_agent") + .get_distinct_entries(stream, "p_user_agent") .await .ok(); From 35561e990a3e453ac5337cc50a884011c1a363d1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Mar 2025 17:20:21 +0530 Subject: [PATCH 06/10] log: don't warn when listed by parseable --- src/prism/logstream/mod.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 9cc96ae6c..91dd033c1 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -245,7 +245,8 @@ impl PrismDatasetRequest { mut self, key: SessionKey, ) -> Result, PrismLogstreamError> { - if self.streams.is_empty() { + let is_empty = self.streams.is_empty(); + if is_empty { self.streams = PARSEABLE.streams.list(); } @@ -254,7 +255,10 @@ impl PrismDatasetRequest { if Users.authorize(key.clone(), Action::ListStream, Some(stream), None) != crate::rbac::Response::Authorized { - warn!("Unauthorized access requested for stream: {stream}"); + // Don't warn if listed from Parseable + if !is_empty { + warn!("Unauthorized access requested for stream: {stream}"); + } continue; } @@ -293,10 +297,7 @@ impl PrismDatasetRequest { // Retrieve distinct values for source identifiers // Returns None if fields aren't present or if query fails let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok(); - let user_agents = self - .get_distinct_entries(stream, "p_user_agent") - .await - .ok(); + let user_agents = self.get_distinct_entries(stream, "p_user_agent").await.ok(); responses.push(PrismDatasetResponse { info, From d9d22353198a2f6567befb064894ab01a01cee6c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Mar 2025 19:51:51 +0530 Subject: [PATCH 07/10] fix: last hour window --- src/prism/logstream/mod.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 91dd033c1..fecb25c6c 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -214,17 +214,13 @@ pub struct PrismDatasetResponse { } /// Request parameters for retrieving Prism dataset information. -/// Defines which streams to query and the time range for the query. +/// Defines which streams to query #[derive(Deserialize)] #[serde(rename_all = "camelCase")] pub struct PrismDatasetRequest { /// List of stream names to query #[serde(default)] streams: Vec, - /// ISO 8601 formatted start time or human-readable time expression - start_time: String, - /// ISO 8601 formatted end time or human-readable time expression - end_time: String, } impl PrismDatasetRequest { @@ -283,8 +279,8 @@ impl PrismDatasetRequest { }; let records = CountsRequest { stream: stream.clone(), - start_time: self.start_time.clone(), - end_time: self.end_time.clone(), + start_time: "1h".to_owned(), + end_time: "now".to_owned(), num_bins: 1, } .get_bin_density() @@ -331,13 +327,13 @@ impl PrismDatasetRequest { ) -> Result, QueryError> { let query = Query { query: format!("SELECT DISTINCT({field}) FOR {stream_name}"), - start_time: self.start_time.clone(), - end_time: self.end_time.clone(), + start_time: "1h".to_owned(), + end_time: "now".to_owned(), send_null: false, filter_tags: None, fields: true, }; - let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; + let time_range = TimeRange::parse_human_time("1h", "now")?; let session_state = QUERY_SESSION.state(); let query = into_query(&query, &session_state, time_range).await?; From d64b8a84f03f6fd810ce311284b72d24a03758d9 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 13 Mar 2025 20:03:55 +0530 Subject: [PATCH 08/10] refactor: work without request body --- src/handlers/http/prism_logstream.rs | 8 ++++++-- src/prism/logstream/mod.rs | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/handlers/http/prism_logstream.rs b/src/handlers/http/prism_logstream.rs index 1eb6cdcf3..69b59f5a5 100644 --- a/src/handlers/http/prism_logstream.rs +++ b/src/handlers/http/prism_logstream.rs @@ -35,11 +35,15 @@ pub async fn get_info(stream_name: Path) -> Result, + dataset_req: Option>, req: HttpRequest, ) -> Result { let session_key = extract_session_key_from_req(&req)?; - let dataset = prism_req.get_datasets(session_key).await?; + let dataset = dataset_req + .map(|Json(r)| r) + .unwrap_or_default() + .get_datasets(session_key) + .await?; Ok(web::Json(dataset)) } diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index fecb25c6c..f18ecd476 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -215,7 +215,7 @@ pub struct PrismDatasetResponse { /// Request parameters for retrieving Prism dataset information. /// Defines which streams to query -#[derive(Deserialize)] +#[derive(Deserialize, Default)] #[serde(rename_all = "camelCase")] pub struct PrismDatasetRequest { /// List of stream names to query From 6bf49c04a0a05b61ad65d40d61655a94d70ef045 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 13:22:59 +0530 Subject: [PATCH 09/10] fix: return stream name in json --- src/prism/logstream/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index f18ecd476..215939ccc 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -199,6 +199,8 @@ async fn get_stream_info_helper(stream_name: &str) -> Result Date: Mon, 17 Mar 2025 14:43:49 +0530 Subject: [PATCH 10/10] suggestion by @praveen5959 Signed-off-by: Devdutt Shenoi --- src/prism/logstream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 215939ccc..e37db20e4 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -283,7 +283,7 @@ impl PrismDatasetRequest { stream: stream.clone(), start_time: "1h".to_owned(), end_time: "now".to_owned(), - num_bins: 1, + num_bins: 10, } .get_bin_density() .await?;