diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 7f71dd6ab..b03de0454 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -24,7 +24,7 @@ use chrono::Utc; use http::StatusCode; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use tracing::{debug, error, warn}; +use tracing::warn; use crate::{ handlers::http::{ @@ -247,79 +247,143 @@ impl PrismDatasetRequest { mut self, key: SessionKey, ) -> Result, PrismLogstreamError> { - let is_empty = self.streams.is_empty(); - if is_empty { + if self.streams.is_empty() { self.streams = PARSEABLE.streams.list(); } - let mut responses = vec![]; - for stream in self.streams.iter() { - if Users.authorize(key.clone(), Action::ListStream, Some(stream), None) - != crate::rbac::Response::Authorized - { - // Don't warn if listed from Parseable - if !is_empty { - warn!("Unauthorized access requested for stream: {stream}"); + // Process streams concurrently + let results = futures::future::join_all( + self.streams + .iter() + .map(|stream| self.process_stream(stream.clone(), key.clone())), + ) + .await; + + // Collect successful responses and handle errors + let mut responses = Vec::new(); + for result in results { + match result { + Ok(Some(response)) => responses.push(response), + Ok(None) => { + warn!("Stream not found or unauthorized access"); + continue; + } + Err(err) => { + warn!("error: {err}"); + continue; } - continue; } + } - if PARSEABLE.check_or_load_stream(stream).await { - debug!("Stream not found: {stream}"); - continue; - } + Ok(responses) + } - let PrismLogstreamInfo { - info, - schema, - stats, - retention, - } = get_prism_logstream_info(stream).await?; - - let hottier = match HotTierManager::global() { - Some(manager) => match manager.get_hot_tier(stream).await { - Ok(stats) => Some(stats), - Err(HotTierError::HotTierValidationError( - HotTierValidationError::NotFound(_), - )) => None, - Err(err) => return Err(err.into()), - }, - _ => None, - }; - let records = CountsRequest { - stream: stream.clone(), - start_time: "1h".to_owned(), - end_time: "now".to_owned(), - num_bins: 10, - } - .get_bin_density() - .await?; - let counts = CountsResponse { - fields: vec!["start_time".into(), "end_time".into(), "count".into()], - records, - }; - - // Retrieve distinct values for source identifiers - // Returns None if fields aren't present or if query fails - let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok(); - let user_agents = self.get_distinct_entries(stream, "p_user_agent").await.ok(); - - responses.push(PrismDatasetResponse { - stream: stream.clone(), - info, - schema, - stats, - retention, - hottier, - counts, - distinct_sources: json!({ - "ips": ips, - "user_agents": user_agents - }), - }) + async fn process_stream( + &self, + stream: String, + key: SessionKey, + ) -> Result, PrismLogstreamError> { + // Skip unauthorized streams + if !self.is_authorized(&stream, &key) { + return Ok(None); } - Ok(responses) + // Skip streams that don't exist + if !self.stream_exists(&stream).await { + return Ok(None); + } + + // Process stream data + match get_prism_logstream_info(&stream).await { + Ok(info) => Ok(Some(self.build_dataset_response(stream, info).await?)), + Err(err) => Err(err), + } + } + + fn is_authorized(&self, stream: &str, key: &SessionKey) -> bool { + if Users.authorize(key.clone(), Action::ListStream, Some(stream), None) + != crate::rbac::Response::Authorized + { + warn!("Unauthorized access requested for stream: {stream}"); + false + } else { + true + } + } + + async fn stream_exists(&self, stream: &str) -> bool { + if PARSEABLE.check_or_load_stream(stream).await { + warn!("Stream not found: {stream}"); + false + } else { + true + } + } + + async fn build_dataset_response( + &self, + stream: String, + info: PrismLogstreamInfo, + ) -> Result { + // Get hot tier info + let hottier = self.get_hot_tier_info(&stream).await?; + + // Get counts + let counts = self.get_counts(&stream).await?; + + // Get distinct entries concurrently + let (ips_result, user_agents_result) = futures::join!( + self.get_distinct_entries(&stream, "p_src_ip"), + self.get_distinct_entries(&stream, "p_user_agent") + ); + + let ips = ips_result.ok(); + let user_agents = user_agents_result.ok(); + + Ok(PrismDatasetResponse { + stream, + info: info.info, + schema: info.schema, + stats: info.stats, + retention: info.retention, + hottier, + counts, + distinct_sources: json!({ + "ips": ips, + "user_agents": user_agents + }), + }) + } + + async fn get_hot_tier_info( + &self, + stream: &str, + ) -> Result, PrismLogstreamError> { + match HotTierManager::global() { + Some(manager) => match manager.get_hot_tier(stream).await { + Ok(stats) => Ok(Some(stats)), + Err(HotTierError::HotTierValidationError(HotTierValidationError::NotFound(_))) => { + Ok(None) + } + Err(err) => Err(err.into()), + }, + None => Ok(None), + } + } + + async fn get_counts(&self, stream: &str) -> Result { + let count_request = CountsRequest { + stream: stream.to_owned(), + start_time: "1h".to_owned(), + end_time: "now".to_owned(), + num_bins: 10, + }; + + let records = count_request.get_bin_density().await?; + Ok(CountsResponse { + fields: vec!["start_time".into(), "end_time".into(), "count".into()], + records, + }) } /// Retrieves distinct values for a specific field in a stream.