From ad026eba6dd74d468cb78f7d39ee58c06eb73e40 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 31 Jul 2025 11:17:56 -0700 Subject: [PATCH 1/4] chore: server side improvements 1. increase dataset fields limit to 1000 2. add hostname to manifest file path 3. update hostname handling 4. list alerts to be sorted on state and then on severity --- src/cli.rs | 2 +- src/handlers/http/alerts.rs | 34 +++++++++++++++++++++++++++++++++- src/parseable/streams.rs | 11 ++++++++++- src/storage/object_storage.rs | 18 ++++++++++++++++-- 4 files changed, 60 insertions(+), 5 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 422eb7c36..e8b9a387e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -35,7 +35,7 @@ use crate::{ pub const DEFAULT_USERNAME: &str = "admin"; pub const DEFAULT_PASSWORD: &str = "admin"; -pub const DATASET_FIELD_COUNT_LIMIT: usize = 250; +pub const DATASET_FIELD_COUNT_LIMIT: usize = 1000; #[derive(Parser)] #[command( name = "parseable", diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 4847cff3a..31e79ef76 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -61,10 +61,42 @@ pub async fn list(req: HttpRequest) -> Result { }; let alerts = alerts.list_alerts_for_user(session_key, tags_list).await?; - let alerts_summary = alerts + let mut alerts_summary = alerts .iter() .map(|alert| alert.to_summary()) .collect::>(); + + // Sort by state priority (Triggered > Silenced > Resolved) then by severity (Critical > High > Medium > Low) + alerts_summary.sort_by(|a, b| { + // Helper function to convert state to priority number (lower number = higher priority) + let state_priority = |state: &str| match state { + "Triggered" => 0, + "Silenced" => 1, + "Resolved" => 2, + _ => 3, // Unknown state gets lowest priority + }; + + // Helper function to convert severity to priority number (lower number = higher priority) + let severity_priority = |severity: &str| match severity { + "Critical" => 0, + "High" => 1, + "Medium" => 2, + "Low" => 3, + _ => 4, // Unknown severity gets lowest priority + }; + + let state_a = a.get("state").and_then(|v| v.as_str()).unwrap_or(""); + let state_b = b.get("state").and_then(|v| v.as_str()).unwrap_or(""); + + let severity_a = a.get("severity").and_then(|v| v.as_str()).unwrap_or(""); + let severity_b = b.get("severity").and_then(|v| v.as_str()).unwrap_or(""); + + // First sort by state, then by severity + state_priority(state_a) + .cmp(&state_priority(state_b)) + .then_with(|| severity_priority(severity_a).cmp(&severity_priority(severity_b))) + }); + Ok(web::Json(alerts_summary)) } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 38e30e692..b88ce20ac 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -41,6 +41,7 @@ use parquet::{ use relative_path::RelativePathBuf; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; +use ulid::Ulid; use crate::{ LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, @@ -185,7 +186,15 @@ impl Stream { parsed_timestamp: NaiveDateTime, custom_partition_values: &HashMap, ) -> String { - let mut hostname = hostname::get().unwrap().into_string().unwrap(); + let mut hostname = hostname::get() + .unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string())) + .into_string() + .unwrap_or_else(|_| Ulid::new().to_string()) + .chars() + .filter(|c| c.is_alphanumeric() || *c == '-' || *c == '_') + .collect::() + .chars() + .collect::(); if let Some(id) = &self.ingestor_id { hostname.push_str(id); } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b21251cc2..e21008b59 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -1011,15 +1011,29 @@ pub fn target_json_path(target_id: &Ulid) -> RelativePathBuf { #[inline(always)] pub fn manifest_path(prefix: &str) -> RelativePathBuf { + let hostname = hostname::get() + .unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string())) + .into_string() + .unwrap_or_else(|_| Ulid::new().to_string()) + .chars() + .filter(|c| c.is_alphanumeric() || *c == '-' || *c == '_') + .collect::() + .chars() + .collect::(); + match &PARSEABLE.options.mode { Mode::Ingest => { let id = INGESTOR_META .get() .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) .get_node_id(); - let manifest_file_name = format!("ingestor.{id}.{MANIFEST_FILE}"); + + let manifest_file_name = format!("ingestor.{hostname}.{id}.{MANIFEST_FILE}"); + RelativePathBuf::from_iter([prefix, &manifest_file_name]) + } + _ => { + let manifest_file_name = format!("{hostname}.{MANIFEST_FILE}"); RelativePathBuf::from_iter([prefix, &manifest_file_name]) } - _ => RelativePathBuf::from_iter([prefix, MANIFEST_FILE]), } } From 7755a2391c7d754862ca133209a760f87fafc592 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 31 Jul 2025 11:27:34 -0700 Subject: [PATCH 2/4] improve hostname logic --- src/parseable/streams.rs | 6 ++---- src/storage/object_storage.rs | 28 +++++++++++----------------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index b88ce20ac..f20341618 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -190,11 +190,9 @@ impl Stream { .unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string())) .into_string() .unwrap_or_else(|_| Ulid::new().to_string()) - .chars() - .filter(|c| c.is_alphanumeric() || *c == '-' || *c == '_') - .collect::() - .chars() + .matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_') .collect::(); + if let Some(id) = &self.ingestor_id { hostname.push_str(id); } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index e21008b59..a77e567c6 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -1015,25 +1015,19 @@ pub fn manifest_path(prefix: &str) -> RelativePathBuf { .unwrap_or_else(|_| std::ffi::OsString::from(&Ulid::new().to_string())) .into_string() .unwrap_or_else(|_| Ulid::new().to_string()) - .chars() - .filter(|c| c.is_alphanumeric() || *c == '-' || *c == '_') - .collect::() - .chars() + .matches(|c: char| c.is_alphanumeric() || c == '-' || c == '_') .collect::(); - match &PARSEABLE.options.mode { - Mode::Ingest => { - let id = INGESTOR_META - .get() - .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) - .get_node_id(); + if PARSEABLE.options.mode == Mode::Ingest { + let id = INGESTOR_META + .get() + .unwrap_or_else(|| panic!("{}", INGESTOR_EXPECT)) + .get_node_id(); - let manifest_file_name = format!("ingestor.{hostname}.{id}.{MANIFEST_FILE}"); - RelativePathBuf::from_iter([prefix, &manifest_file_name]) - } - _ => { - let manifest_file_name = format!("{hostname}.{MANIFEST_FILE}"); - RelativePathBuf::from_iter([prefix, &manifest_file_name]) - } + let manifest_file_name = format!("ingestor.{hostname}.{id}.{MANIFEST_FILE}"); + RelativePathBuf::from_iter([prefix, &manifest_file_name]) + } else { + let manifest_file_name = format!("{hostname}.{MANIFEST_FILE}"); + RelativePathBuf::from_iter([prefix, &manifest_file_name]) } } From 4d872cde539287c208c4eca6dfd355ba0816188a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 31 Jul 2025 19:50:48 -0700 Subject: [PATCH 3/4] add pagination to alerts list api --- src/alerts/mod.rs | 40 +++++++++++--- src/handlers/http/alerts.rs | 101 +++++++++++++++++++++++++----------- 2 files changed, 104 insertions(+), 37 deletions(-) diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 90aeb16f4..c8b0abfce 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -469,7 +469,19 @@ pub enum EvalConfig { #[serde(rename_all = "camelCase")] pub struct AlertEval {} -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy, PartialEq, Default, FromStr)] +#[derive( + Debug, + serde::Serialize, + serde::Deserialize, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Default, + FromStr, +)] #[serde(rename_all = "camelCase")] pub enum AlertState { Triggered, @@ -488,7 +500,19 @@ impl Display for AlertState { } } -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Default)] +#[derive( + Debug, + serde::Serialize, + serde::Deserialize, + Clone, + Copy, + PartialEq, + Eq, + PartialOrd, + Ord, + Default, + FromStr, +)] #[serde(rename_all = "camelCase")] pub enum Severity { Critical, @@ -1308,8 +1332,8 @@ pub enum AlertError { ParserError(#[from] ParserError), #[error("Invalid alert query")] InvalidAlertQuery, - #[error("Invalid query parameter")] - InvalidQueryParameter, + #[error("Invalid query parameter: {0}")] + InvalidQueryParameter(String), #[error("{0}")] ArrowError(#[from] ArrowError), #[error("Upgrade to Parseable Enterprise for {0} type alerts")] @@ -1336,7 +1360,7 @@ impl actix_web::ResponseError for AlertError { Self::TargetInUse => StatusCode::CONFLICT, Self::ParserError(_) => StatusCode::BAD_REQUEST, Self::InvalidAlertQuery => StatusCode::BAD_REQUEST, - Self::InvalidQueryParameter => StatusCode::BAD_REQUEST, + Self::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST, Self::ArrowError(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST, } @@ -1650,7 +1674,7 @@ pub async fn get_alerts_summary() -> Result { triggered_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: alert.get_severity().clone(), + severity: *alert.get_severity(), }); } AlertState::Silenced => { @@ -1658,7 +1682,7 @@ pub async fn get_alerts_summary() -> Result { silenced_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: alert.get_severity().clone(), + severity: *alert.get_severity(), }); } AlertState::Resolved => { @@ -1666,7 +1690,7 @@ pub async fn get_alerts_summary() -> Result { resolved_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: alert.get_severity().clone(), + severity: *alert.get_severity(), }); } } diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 31e79ef76..c86b2041c 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -31,17 +31,28 @@ use actix_web::{ use bytes::Bytes; use ulid::Ulid; -use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState}; +use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState, Severity}; // GET /alerts /// User needs at least a read access to the stream(s) that is being referenced in an alert /// Read all alerts then return alerts which satisfy the condition +/// Supports pagination with optional query parameters: +/// - tags: comma-separated list of tags to filter alerts +/// - offset: number of alerts to skip (default: 0) +/// - limit: maximum number of alerts to return (default: 100, max: 1000) pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req)?; let query_map = web::Query::>::from_query(req.query_string()) - .map_err(|_| AlertError::InvalidQueryParameter)?; + .map_err(|_| AlertError::InvalidQueryParameter("malformed query parameters".to_string()))?; + let mut tags_list = Vec::new(); + let mut offset = 0usize; + let mut limit = 100usize; // Default limit + const MAX_LIMIT: usize = 1000; // Maximum allowed limit + + // Parse query parameters if !query_map.is_empty() { + // Parse tags parameter if let Some(tags) = query_map.get("tags") { tags_list = tags .split(',') @@ -49,10 +60,34 @@ pub async fn list(req: HttpRequest) -> Result { .filter(|s| !s.is_empty()) .collect(); if tags_list.is_empty() { - return Err(AlertError::InvalidQueryParameter); + return Err(AlertError::InvalidQueryParameter( + "empty tags not allowed with query param tags".to_string(), + )); + } + } + + // Parse offset parameter + if let Some(offset_str) = query_map.get("offset") { + offset = offset_str.parse().map_err(|_| { + AlertError::InvalidQueryParameter("offset is not a valid number".to_string()) + })?; + } + + // Parse limit parameter + if let Some(limit_str) = query_map.get("limit") { + limit = limit_str.parse().map_err(|_| { + AlertError::InvalidQueryParameter("limit is not a valid number".to_string()) + })?; + + // Validate limit bounds + if limit == 0 || limit > MAX_LIMIT { + return Err(AlertError::InvalidQueryParameter( + "limit should be between 1 and 1000".to_string(), + )); } } } + let guard = ALERTS.read().await; let alerts = if let Some(alerts) = guard.as_ref() { alerts @@ -68,36 +103,44 @@ pub async fn list(req: HttpRequest) -> Result { // Sort by state priority (Triggered > Silenced > Resolved) then by severity (Critical > High > Medium > Low) alerts_summary.sort_by(|a, b| { - // Helper function to convert state to priority number (lower number = higher priority) - let state_priority = |state: &str| match state { - "Triggered" => 0, - "Silenced" => 1, - "Resolved" => 2, - _ => 3, // Unknown state gets lowest priority - }; - - // Helper function to convert severity to priority number (lower number = higher priority) - let severity_priority = |severity: &str| match severity { - "Critical" => 0, - "High" => 1, - "Medium" => 2, - "Low" => 3, - _ => 4, // Unknown severity gets lowest priority - }; - - let state_a = a.get("state").and_then(|v| v.as_str()).unwrap_or(""); - let state_b = b.get("state").and_then(|v| v.as_str()).unwrap_or(""); - - let severity_a = a.get("severity").and_then(|v| v.as_str()).unwrap_or(""); - let severity_b = b.get("severity").and_then(|v| v.as_str()).unwrap_or(""); + // Parse state and severity from JSON values back to enums + let state_a = a + .get("state") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(AlertState::Resolved); // Default to lowest priority + + let state_b = b + .get("state") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(AlertState::Resolved); + + let severity_a = a + .get("severity") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(Severity::Low); // Default to lowest priority + + let severity_b = b + .get("severity") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(Severity::Low); // First sort by state, then by severity - state_priority(state_a) - .cmp(&state_priority(state_b)) - .then_with(|| severity_priority(severity_a).cmp(&severity_priority(severity_b))) + state_a + .cmp(&state_b) + .then_with(|| severity_a.cmp(&severity_b)) }); - Ok(web::Json(alerts_summary)) + let paginated_alerts = alerts_summary + .into_iter() + .skip(offset) + .take(limit) + .collect::>(); + + Ok(web::Json(paginated_alerts)) } // POST /alerts From 50007674e32eb1476d163a9269c81d010886a2ea Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 31 Jul 2025 19:58:30 -0700 Subject: [PATCH 4/4] deepsource fix --- src/alerts/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index c8b0abfce..5597f71c3 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -1180,7 +1180,7 @@ impl AlertConfig { self.id, self.title.clone(), self.state, - self.severity.clone().to_string(), + self.severity.to_string(), ), DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), String::default(),