diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c51d98bd4..161bd88f5 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -76,7 +76,8 @@ impl ParseableServer for QueryServer { .service( web::scope(&prism_base_path()) .service(Server::get_prism_home()) - .service(Server::get_prism_logstream()), + .service(Server::get_prism_logstream()) + .service(Server::get_prism_datasets()), ) .service(Server::get_generated()); } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 38d0e0239..d28b02b59 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -95,7 +95,8 @@ impl ParseableServer for Server { .service( web::scope(&prism_base_path()) .service(Server::get_prism_home()) - .service(Server::get_prism_logstream()), + .service(Server::get_prism_logstream()) + .service(Server::get_prism_datasets()), ) .service(Self::get_ingest_otel_factory()) .service(Self::get_generated()); @@ -180,6 +181,17 @@ impl Server { ) } + pub fn get_prism_datasets() -> Scope { + web::scope("/datasets").route( + "", + web::post() + .to(http::prism_logstream::post_datasets) + .authorize_for_stream(Action::GetStreamInfo) + .authorize_for_stream(Action::GetStats) + .authorize_for_stream(Action::GetRetention), + ) + } + pub fn get_metrics_webscope() -> Scope { web::scope("/metrics").service( web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)), diff --git a/src/handlers/http/prism_logstream.rs b/src/handlers/http/prism_logstream.rs index 2be76a094..69b59f5a5 100644 --- a/src/handlers/http/prism_logstream.rs +++ b/src/handlers/http/prism_logstream.rs @@ -17,11 +17,14 @@ */ use actix_web::{ - web::{self, Path}, - Responder, + web::{self, Json, Path}, + HttpRequest, Responder, }; -use crate::prism::logstream::{get_prism_logstream_info, PrismLogstreamError}; +use crate::{ + prism::logstream::{get_prism_logstream_info, PrismDatasetRequest, PrismLogstreamError}, + utils::actix::extract_session_key_from_req, +}; /// This API is essentially just combining the responses of /info and /schema together pub async fn get_info(stream_name: Path) -> Result { @@ -29,3 +32,18 @@ pub async fn get_info(stream_name: Path) -> Result>, + req: HttpRequest, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let dataset = dataset_req + .map(|Json(r)| r) + .unwrap_or_default() + .get_datasets(session_key) + .await?; + + Ok(web::Json(dataset)) +} diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index e9ffcb981..e37db20e4 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -22,7 +22,9 @@ use actix_web::http::header::ContentType; use arrow_schema::Schema; use chrono::Utc; use http::StatusCode; -use serde::Serialize; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tracing::{debug, warn}; use crate::{ handlers::http::{ @@ -31,11 +33,18 @@ use crate::{ utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, logstream::error::StreamError, - query::update_schema_when_distributed, + query::{into_query, update_schema_when_distributed, Query, QueryError}, }, + hottier::{HotTierError, HotTierManager, StreamHotTier}, parseable::{StreamNotFound, PARSEABLE}, + query::{error::ExecuteError, execute, CountsRequest, CountsResponse, QUERY_SESSION}, + rbac::{map::SessionKey, role::Action, Users}, stats, storage::{retention::Retention, StreamInfo, StreamType}, + utils::{ + arrow::record_batches_to_json, + time::{TimeParseError, TimeRange}, + }, LOCK_EXPECT, }; @@ -185,6 +194,168 @@ async fn get_stream_info_helper(stream_name: &str) -> Result, + /// Count of records in the specified time range + counts: CountsResponse, + /// Collection of distinct values for source identifiers + distinct_sources: Value, +} + +/// Request parameters for retrieving Prism dataset information. +/// Defines which streams to query +#[derive(Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct PrismDatasetRequest { + /// List of stream names to query + #[serde(default)] + streams: Vec, +} + +impl PrismDatasetRequest { + /// Retrieves dataset information for all specified streams. + /// + /// Processes each stream in the request and compiles their information. + /// Streams that don't exist or can't be accessed are skipped. + /// + /// # Returns + /// - `Ok(Vec)`: List of responses for successfully processed streams + /// - `Err(PrismLogstreamError)`: If a critical error occurs during processing + /// + /// # Note + /// 1. This method won't fail if individual streams fail - it will only include + /// successfully processed streams in the result. + /// 2. On receiving an empty stream list, we return for all streams the user is able to query for + pub async fn get_datasets( + mut self, + key: SessionKey, + ) -> Result, PrismLogstreamError> { + let is_empty = self.streams.is_empty(); + if is_empty { + self.streams = PARSEABLE.streams.list(); + } + + let mut responses = vec![]; + for stream in self.streams.iter() { + if Users.authorize(key.clone(), Action::ListStream, Some(stream), None) + != crate::rbac::Response::Authorized + { + // Don't warn if listed from Parseable + if !is_empty { + warn!("Unauthorized access requested for stream: {stream}"); + } + continue; + } + + if PARSEABLE.check_or_load_stream(stream).await { + debug!("Stream not found: {stream}"); + continue; + } + + let PrismLogstreamInfo { + info, + stats, + retention, + .. + } = get_prism_logstream_info(stream).await?; + + let hottier = match HotTierManager::global() { + Some(hot_tier_manager) => { + let stats = hot_tier_manager.get_hot_tier(stream).await?; + Some(stats) + } + _ => None, + }; + let records = CountsRequest { + stream: stream.clone(), + start_time: "1h".to_owned(), + end_time: "now".to_owned(), + num_bins: 10, + } + .get_bin_density() + .await?; + let counts = CountsResponse { + fields: vec!["start_time".into(), "end_time".into(), "count".into()], + records, + }; + + // Retrieve distinct values for source identifiers + // Returns None if fields aren't present or if query fails + let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok(); + let user_agents = self.get_distinct_entries(stream, "p_user_agent").await.ok(); + + responses.push(PrismDatasetResponse { + stream: stream.clone(), + info, + stats, + retention, + hottier, + counts, + distinct_sources: json!({ + "ips": ips, + "user_agents": user_agents + }), + }) + } + + Ok(responses) + } + + /// Retrieves distinct values for a specific field in a stream. + /// + /// # Parameters + /// - `stream_name`: Name of the stream to query + /// - `field`: Field name to get distinct values for + /// + /// # Returns + /// - `Ok(Vec)`: List of distinct values found for the field + /// - `Err(QueryError)`: If the query fails or field doesn't exist + async fn get_distinct_entries( + &self, + stream_name: &str, + field: &str, + ) -> Result, QueryError> { + let query = Query { + query: format!("SELECT DISTINCT({field}) FOR {stream_name}"), + start_time: "1h".to_owned(), + end_time: "now".to_owned(), + send_null: false, + filter_tags: None, + fields: true, + }; + let time_range = TimeRange::parse_human_time("1h", "now")?; + + let session_state = QUERY_SESSION.state(); + let query = into_query(&query, &session_state, time_range).await?; + let (records, _) = execute(query, stream_name).await?; + let response = record_batches_to_json(&records)?; + // Extract field values from the JSON response + let values = response + .iter() + .flat_map(|row| { + row.get(field) + .and_then(|s| s.as_str()) + .map(|s| s.to_string()) + }) + .collect(); + + Ok(values) + } +} + #[derive(Debug, thiserror::Error)] pub enum PrismLogstreamError { #[error("Error: {0}")] @@ -193,6 +364,16 @@ pub enum PrismLogstreamError { StreamError(#[from] StreamError), #[error("StreamNotFound: {0}")] StreamNotFound(#[from] StreamNotFound), + #[error("Hottier: {0}")] + Hottier(#[from] HotTierError), + #[error("Query: {0}")] + Query(#[from] QueryError), + #[error("TimeParse: {0}")] + TimeParse(#[from] TimeParseError), + #[error("Execute: {0}")] + Execute(#[from] ExecuteError), + #[error("Auth: {0}")] + Auth(#[from] actix_web::Error), } impl actix_web::ResponseError for PrismLogstreamError { @@ -201,6 +382,11 @@ impl actix_web::ResponseError for PrismLogstreamError { PrismLogstreamError::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, PrismLogstreamError::StreamError(e) => e.status_code(), PrismLogstreamError::StreamNotFound(_) => StatusCode::NOT_FOUND, + PrismLogstreamError::Hottier(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND, + PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR, + PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED, } } diff --git a/src/response.rs b/src/response.rs index 8211bf4c7..fe3159f71 100644 --- a/src/response.rs +++ b/src/response.rs @@ -33,8 +33,7 @@ pub struct QueryResponse { impl QueryResponse { pub fn to_http(&self) -> Result { info!("{}", "Returning query results"); - let records: Vec<&RecordBatch> = self.records.iter().collect(); - let mut json_records = record_batches_to_json(&records)?; + let mut json_records = record_batches_to_json(&self.records)?; if self.fill_null { for map in &mut json_records { diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index 53e6437d6..e29762047 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -90,10 +90,12 @@ pub fn replace_columns( /// * Result>> /// /// A vector of JSON objects representing the record batches. -pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result>> { +pub fn record_batches_to_json(records: &[RecordBatch]) -> Result>> { let buf = vec![]; let mut writer = arrow_json::ArrayWriter::new(buf); - writer.write_batches(records)?; + for record in records { + writer.write(record)?; + } writer.finish()?; let buf = writer.into_inner(); @@ -188,7 +190,7 @@ mod tests { #[test] fn check_empty_json_to_record_batches() { let r = RecordBatch::new_empty(Arc::new(Schema::empty())); - let rb = vec![&r]; + let rb = vec![r]; let batches = record_batches_to_json(&rb).unwrap(); assert_eq!(batches, vec![]); }