diff --git a/src/alerts/alert_enums.rs b/src/alerts/alert_enums.rs new file mode 100644 index 000000000..518fbd283 --- /dev/null +++ b/src/alerts/alert_enums.rs @@ -0,0 +1,302 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::fmt::{self, Display}; + +use chrono::{DateTime, Utc}; +use derive_more::derive::FromStr; +use ulid::Ulid; + +use crate::alerts::{ + alert_structs::{AnomalyConfig, ForecastConfig, RollingWindow}, + alert_traits::AlertTrait, +}; + +pub enum AlertTask { + Create(Box), + Delete(Ulid), +} + +#[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "lowercase")] +pub enum AlertVersion { + V1, + #[default] + V2, +} + +impl From<&str> for AlertVersion { + fn from(value: &str) -> Self { + match value { + "v1" => Self::V1, + "v2" => Self::V2, + _ => Self::V2, // default to v2 + } + } +} + +#[derive( + Debug, + serde::Serialize, + serde::Deserialize, + Clone, + Default, + FromStr, + PartialEq, + PartialOrd, + Eq, + Ord, +)] +#[serde(rename_all = "camelCase")] +pub enum Severity { + Critical, + High, + #[default] + Medium, + Low, +} + +impl Display for Severity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Severity::Critical => write!(f, "Critical"), + Severity::High => write!(f, "High"), + Severity::Medium => write!(f, "Medium"), + Severity::Low => write!(f, "Low"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum LogicalOperator { + And, + Or, +} + +impl Display for LogicalOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + LogicalOperator::And => write!(f, "AND"), + LogicalOperator::Or => write!(f, "OR"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum AlertType { + Threshold, + Anomaly(AnomalyConfig), + Forecast(ForecastConfig), +} + +impl Display for AlertType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertType::Threshold => write!(f, "threshold"), + AlertType::Anomaly(_) => write!(f, "anomaly"), + AlertType::Forecast(_) => write!(f, "forecast"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AlertOperator { + #[serde(rename = ">")] + GreaterThan, + #[serde(rename = "<")] + LessThan, + #[serde(rename = "=")] + Equal, + #[serde(rename = "!=")] + NotEqual, + #[serde(rename = ">=")] + GreaterThanOrEqual, + #[serde(rename = "<=")] + LessThanOrEqual, +} + +impl Display for AlertOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertOperator::GreaterThan => write!(f, ">"), + AlertOperator::LessThan => write!(f, "<"), + AlertOperator::Equal => write!(f, "="), + AlertOperator::NotEqual => write!(f, "!="), + AlertOperator::GreaterThanOrEqual => write!(f, ">="), + AlertOperator::LessThanOrEqual => write!(f, "<="), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub enum WhereConfigOperator { + #[serde(rename = "=")] + Equal, + #[serde(rename = "!=")] + NotEqual, + #[serde(rename = "<")] + LessThan, + #[serde(rename = ">")] + GreaterThan, + #[serde(rename = "<=")] + LessThanOrEqual, + #[serde(rename = ">=")] + GreaterThanOrEqual, + #[serde(rename = "is null")] + IsNull, + #[serde(rename = "is not null")] + IsNotNull, + #[serde(rename = "ilike")] + ILike, + #[serde(rename = "contains")] + Contains, + #[serde(rename = "begins with")] + BeginsWith, + #[serde(rename = "ends with")] + EndsWith, + #[serde(rename = "does not contain")] + DoesNotContain, + #[serde(rename = "does not begin with")] + DoesNotBeginWith, + #[serde(rename = "does not end with")] + DoesNotEndWith, +} + +impl WhereConfigOperator { + /// Convert the enum value to its string representation + pub fn as_str(&self) -> &'static str { + match self { + Self::Equal => "=", + Self::NotEqual => "!=", + Self::LessThan => "<", + Self::GreaterThan => ">", + Self::LessThanOrEqual => "<=", + Self::GreaterThanOrEqual => ">=", + Self::IsNull => "is null", + Self::IsNotNull => "is not null", + Self::ILike => "ilike", + Self::Contains => "contains", + Self::BeginsWith => "begins with", + Self::EndsWith => "ends with", + Self::DoesNotContain => "does not contain", + Self::DoesNotBeginWith => "does not begin with", + Self::DoesNotEndWith => "does not end with", + } + } +} + +impl Display for WhereConfigOperator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // We can reuse our as_str method to get the string representation + write!(f, "{}", self.as_str()) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum AggregateFunction { + Avg, + Count, + CountDistinct, + Min, + Max, + Sum, +} + +impl Display for AggregateFunction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AggregateFunction::Avg => write!(f, "Avg"), + AggregateFunction::Count => write!(f, "Count"), + AggregateFunction::CountDistinct => write!(f, "CountDistinct"), + AggregateFunction::Min => write!(f, "Min"), + AggregateFunction::Max => write!(f, "Max"), + AggregateFunction::Sum => write!(f, "Sum"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub enum EvalConfig { + RollingWindow(RollingWindow), +} + +#[derive( + Debug, + serde::Serialize, + serde::Deserialize, + Clone, + Copy, + PartialEq, + Default, + FromStr, + Eq, + PartialOrd, + Ord, +)] +#[serde(rename_all = "camelCase")] +pub enum AlertState { + Triggered, + #[default] + #[serde(rename = "not-triggered")] + NotTriggered, + Disabled, +} + +impl Display for AlertState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AlertState::Triggered => write!(f, "triggered"), + AlertState::Disabled => write!(f, "disabled"), + AlertState::NotTriggered => write!(f, "not-triggered"), + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Default)] +#[serde(rename_all = "camelCase")] +pub enum NotificationState { + #[default] + Notify, + /// Mute means the alert will evaluate but no notifications will be sent out + /// + /// It is a state which can only be set manually + /// + /// user needs to pass the timestamp or the duration (in human time) till which the alert is silenced + Mute(String), +} + +impl Display for NotificationState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + NotificationState::Notify => write!(f, "notify"), + NotificationState::Mute(till_time) => { + let till = match till_time.as_str() { + "indefinite" => &DateTime::::MAX_UTC.to_rfc3339(), + _ => till_time, + }; + write!(f, "{till}") + } + } + } +} diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs new file mode 100644 index 000000000..0a367ed12 --- /dev/null +++ b/src/alerts/alert_structs.rs @@ -0,0 +1,516 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{collections::HashMap, time::Duration}; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{RwLock, mpsc}; +use ulid::Ulid; + +use crate::{ + alerts::{ + AlertError, CURRENT_ALERTS_VERSION, + alert_enums::{ + AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig, + LogicalOperator, NotificationState, Severity, WhereConfigOperator, + }, + alert_traits::AlertTrait, + target::{NotificationConfig, TARGETS}, + }, + query::resolve_stream_names, +}; + +/// Helper struct for basic alert fields during migration +pub struct BasicAlertFields { + pub id: Ulid, + pub title: String, + pub severity: Severity, +} + +#[derive(Debug)] +pub struct Alerts { + pub alerts: RwLock>>, + pub sender: mpsc::Sender, +} + +#[derive(Debug, Clone)] +pub struct Context { + pub alert_info: AlertInfo, + pub deployment_info: DeploymentInfo, + pub message: String, + pub notification_config: NotificationConfig, +} + +impl Context { + pub fn new( + alert_info: AlertInfo, + deployment_info: DeploymentInfo, + notification_config: NotificationConfig, + message: String, + ) -> Self { + Self { + alert_info, + deployment_info, + message, + notification_config, + } + } + + pub(crate) fn default_alert_string(&self) -> String { + format!( + "AlertName: {}\nTriggered TimeStamp: {}\nSeverity: {}\n{}", + self.alert_info.alert_name, + Utc::now().to_rfc3339(), + self.alert_info.severity, + self.message + ) + } + + pub(crate) fn default_resolved_string(&self) -> String { + format!("{} is now `not-triggered` ", self.alert_info.alert_name) + } + + pub(crate) fn default_disabled_string(&self) -> String { + format!( + "{} is now `disabled`. No more evals will be run till the sate is `disabled`.", + self.alert_info.alert_name + ) + } +} + +#[derive(Debug, Clone)] +pub struct AlertInfo { + pub alert_id: Ulid, + pub alert_name: String, + // message: String, + // reason: String, + pub alert_state: AlertState, + pub notification_state: NotificationState, + pub severity: String, +} + +impl AlertInfo { + pub fn new( + alert_id: Ulid, + alert_name: String, + alert_state: AlertState, + notification_state: NotificationState, + severity: String, + ) -> Self { + Self { + alert_id, + alert_name, + alert_state, + notification_state, + severity, + } + } +} + +#[derive(Debug, Clone)] +pub struct DeploymentInfo { + pub deployment_instance: String, + pub deployment_id: Ulid, + pub deployment_mode: String, +} + +impl DeploymentInfo { + pub fn new(deployment_instance: String, deployment_id: Ulid, deployment_mode: String) -> Self { + Self { + deployment_instance, + deployment_id, + deployment_mode, + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct OperationConfig { + pub column: String, + pub operator: Option, + pub value: Option, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct FilterConfig { + pub conditions: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +pub struct ConditionConfig { + pub column: String, + pub operator: WhereConfigOperator, + pub value: Option, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Conditions { + pub operator: Option, + pub condition_config: Vec, +} + +impl Conditions { + pub fn generate_filter_message(&self) -> String { + match &self.operator { + Some(op) => match op { + LogicalOperator::And | LogicalOperator::Or => { + let expr1 = &self.condition_config[0]; + let expr2 = &self.condition_config[1]; + let expr1_msg = if expr1.value.as_ref().is_some_and(|v| !v.is_empty()) { + format!( + "{} {} {}", + expr1.column, + expr1.operator, + expr1.value.as_ref().unwrap() + ) + } else { + format!("{} {}", expr1.column, expr1.operator) + }; + + let expr2_msg = if expr2.value.as_ref().is_some_and(|v| !v.is_empty()) { + format!( + "{} {} {}", + expr2.column, + expr2.operator, + expr2.value.as_ref().unwrap() + ) + } else { + format!("{} {}", expr2.column, expr2.operator) + }; + + format!("[{expr1_msg} {op} {expr2_msg}]") + } + }, + None => { + let expr = &self.condition_config[0]; + if let Some(val) = &expr.value { + format!("{} {} {}", expr.column, expr.operator, val) + } else { + format!("{} {}", expr.column, expr.operator) + } + } + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct GroupBy { + pub columns: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ThresholdConfig { + pub operator: AlertOperator, + pub value: f64, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct RollingWindow { + // x minutes (25m) + pub eval_start: String, + // should always be "now" + pub eval_end: String, + // x minutes (5m) + pub eval_frequency: u64, +} + +impl Default for RollingWindow { + fn default() -> Self { + Self { + eval_start: "10m".into(), + eval_end: "now".into(), + eval_frequency: 10, + } + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertRequest { + #[serde(default = "Severity::default")] + pub severity: Severity, + pub title: String, + pub query: String, + pub alert_type: String, + pub anomaly_config: Option, + pub forecast_config: Option, + pub threshold_config: ThresholdConfig, + #[serde(default)] + pub notification_config: NotificationConfig, + pub eval_config: EvalConfig, + pub targets: Vec, + pub tags: Option>, +} + +impl AlertRequest { + pub async fn into(self) -> Result { + // Validate that all target IDs exist + for id in &self.targets { + TARGETS.get_target_by_id(id).await?; + } + let datasets = resolve_stream_names(&self.query)?; + let config = AlertConfig { + version: AlertVersion::from(CURRENT_ALERTS_VERSION), + id: Ulid::new(), + severity: self.severity, + title: self.title, + query: self.query, + datasets, + alert_type: { + match self.alert_type.as_str() { + "anomaly" => { + if let Some(conf) = self.anomaly_config { + AlertType::Anomaly(conf) + } else { + return Err(AlertError::Metadata( + "anomalyConfig is required for anomaly type alerts", + )); + } + } + "forecast" => { + if let Some(conf) = self.forecast_config { + AlertType::Forecast(conf) + } else { + return Err(AlertError::Metadata( + "forecastConfig is required for forecast type alerts", + )); + } + } + "threshold" => AlertType::Threshold, + _ => return Err(AlertError::Metadata("Invalid alert type provided")), + } + }, + threshold_config: self.threshold_config, + eval_config: self.eval_config, + targets: self.targets, + state: AlertState::default(), + notification_state: NotificationState::Notify, + notification_config: self.notification_config, + created: Utc::now(), + tags: self.tags, + }; + Ok(config) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertConfig { + pub version: AlertVersion, + #[serde(default)] + pub id: Ulid, + pub severity: Severity, + pub title: String, + pub query: String, + pub datasets: Vec, + pub alert_type: AlertType, + pub threshold_config: ThresholdConfig, + pub eval_config: EvalConfig, + pub targets: Vec, + // for new alerts, state should be resolved + #[serde(default)] + pub state: AlertState, + pub notification_state: NotificationState, + pub notification_config: NotificationConfig, + pub created: DateTime, + pub tags: Option>, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AlertConfigResponse { + pub version: AlertVersion, + #[serde(default)] + pub id: Ulid, + pub severity: Severity, + pub title: String, + pub query: String, + pub datasets: Vec, + pub alert_type: &'static str, + pub anomaly_config: Option, + pub forecast_config: Option, + pub threshold_config: ThresholdConfig, + pub eval_config: EvalConfig, + pub targets: Vec, + // for new alerts, state should be resolved + #[serde(default)] + pub state: AlertState, + pub notification_state: NotificationState, + pub notification_config: NotificationConfig, + pub created: DateTime, + pub tags: Option>, +} + +impl AlertConfig { + pub fn to_response(self) -> AlertConfigResponse { + AlertConfigResponse { + version: self.version, + id: self.id, + severity: self.severity, + title: self.title, + query: self.query, + datasets: self.datasets, + alert_type: { + match self.alert_type { + AlertType::Threshold => "threshold", + AlertType::Anomaly(_) => "anomaly", + AlertType::Forecast(_) => "forecast", + } + }, + anomaly_config: { + match &self.alert_type { + AlertType::Anomaly(conf) => Some(conf.clone()), + _ => None, + } + }, + forecast_config: { + match self.alert_type { + AlertType::Forecast(conf) => Some(conf), + _ => None, + } + }, + threshold_config: self.threshold_config, + eval_config: self.eval_config, + targets: self.targets, + state: self.state, + notification_state: self.notification_state, + notification_config: self.notification_config, + created: self.created, + tags: self.tags, + } + } +} + +#[derive(Debug, Serialize)] +pub struct AlertsSummary { + pub total: u64, + pub triggered: AlertsInfoByState, + pub disabled: AlertsInfoByState, + #[serde(rename = "not-triggered")] + pub not_triggered: AlertsInfoByState, +} + +#[derive(Debug, Serialize)] +pub struct AlertsInfoByState { + pub total: u64, + pub alert_info: Vec, +} + +#[derive(Debug, Serialize)] +pub struct AlertsInfo { + pub title: String, + pub id: Ulid, + pub severity: Severity, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct ForecastConfig { + pub historic_duration: String, + pub forecast_duration: String, +} + +impl ForecastConfig { + pub fn calculate_eval_window(&self) -> Result { + let parsed_historic_duration = + if let Ok(historic_duration) = humantime::parse_duration(&self.historic_duration) { + historic_duration + } else { + return Err(AlertError::Metadata( + "historicDuration should be of type humantime", + )); + }; + + let eval_window = if parsed_historic_duration.lt(&Duration::from_secs(60 * 60 * 24 * 3)) { + // less than 3 days = 10 mins + "10m" + } else { + "30m" + }; + + Ok(eval_window.into()) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct AnomalyConfig { + pub historic_duration: String, +} + +impl AnomalyConfig { + pub fn calculate_eval_window(&self) -> Result { + let parsed_historic_duration = + if let Ok(historic_duration) = humantime::parse_duration(&self.historic_duration) { + historic_duration + } else { + return Err(AlertError::Metadata( + "historicDuration should be of type humantime", + )); + }; + + let eval_window = if parsed_historic_duration.lt(&Duration::from_secs(60 * 60 * 24 * 3)) { + // less than 3 days = 10 mins + "10m" + } else { + "30m" + }; + + Ok(eval_window.into()) + } +} + +/// Result structure for alert query execution with group support +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AlertQueryResult { + /// List of group results, each containing group values and the aggregate value + pub groups: Vec, + /// True if this is a simple query without GROUP BY (single group with empty group_values) + pub is_simple_query: bool, +} + +/// Result for a single group in a GROUP BY query +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GroupResult { + /// The group-by column values (empty for non-GROUP BY queries) + pub group_values: HashMap, + /// The aggregate function value for this group + pub aggregate_value: f64, +} + +impl AlertQueryResult { + /// Get the single aggregate value for simple queries (backward compatibility) + pub fn get_single_value(&self) -> f64 { + if self.is_simple_query && !self.groups.is_empty() { + self.groups[0].aggregate_value + } else { + 0.0 + } + } +} + +#[derive(serde::Deserialize)] +pub struct NotificationStateRequest { + pub state: String, +} diff --git a/src/alerts/traits.rs b/src/alerts/alert_traits.rs similarity index 67% rename from src/alerts/traits.rs rename to src/alerts/alert_traits.rs index 303f6d42f..be0156cd9 100644 --- a/src/alerts/traits.rs +++ b/src/alerts/alert_traits.rs @@ -18,22 +18,45 @@ use crate::{ alerts::{ - AlertConfig, AlertError, AlertState, AlertType, EvalConfig, Severity, ThresholdConfig, + AlertConfig, AlertError, AlertState, AlertType, EvalConfig, Severity, + alert_enums::NotificationState, + alert_structs::{Context, ThresholdConfig}, }, rbac::map::SessionKey, }; +use chrono::{DateTime, Utc}; use std::{collections::HashMap, fmt::Debug}; use tonic::async_trait; use ulid::Ulid; +/// A trait to handle different types of messages built by different alert types +pub trait MessageCreation { + fn create_threshold_message(&self, actual_value: f64) -> Result; + fn create_anomaly_message( + &self, + actual_value: f64, + lower_bound: f64, + upper_bound: f64, + ) -> Result; + fn create_forecast_message( + &self, + forecasted_time: DateTime, + forecasted_value: f64, + ) -> Result; + fn get_message_header(&self) -> Result; +} + #[async_trait] pub trait AlertTrait: Debug + Send + Sync { - async fn eval_alert(&self) -> Result<(bool, f64), AlertError>; + async fn eval_alert(&self) -> Result, AlertError>; async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError>; + async fn update_notification_state( + &mut self, + new_notification_state: NotificationState, + ) -> Result<(), AlertError>; async fn update_state( &mut self, - is_manual: bool, - new_state: AlertState, + alert_state: AlertState, trigger_notif: Option, ) -> Result<(), AlertError>; fn get_id(&self) -> &Ulid; @@ -43,13 +66,13 @@ pub trait AlertTrait: Debug + Send + Sync { fn get_alert_type(&self) -> &AlertType; fn get_threshold_config(&self) -> &ThresholdConfig; fn get_eval_config(&self) -> &EvalConfig; - fn get_targets(&self) -> &Vec; + fn get_targets(&self) -> &[Ulid]; fn get_state(&self) -> &AlertState; - fn get_eval_window(&self) -> String; + fn get_eval_window(&self) -> &str; fn get_eval_frequency(&self) -> u64; fn get_created(&self) -> String; fn get_tags(&self) -> &Option>; - fn get_datasets(&self) -> &Vec; + fn get_datasets(&self) -> &[String]; fn to_alert_config(&self) -> AlertConfig; fn clone_box(&self) -> Box; } @@ -70,6 +93,11 @@ pub trait AlertManagerTrait: Send + Sync { new_state: AlertState, trigger_notif: Option, ) -> Result<(), AlertError>; + async fn update_notification_state( + &self, + alert_id: Ulid, + new_notification_state: NotificationState, + ) -> Result<(), AlertError>; async fn delete(&self, alert_id: Ulid) -> Result<(), AlertError>; async fn get_state(&self, alert_id: Ulid) -> Result; async fn start_task(&self, alert: Box) -> Result<(), AlertError>; @@ -77,3 +105,8 @@ pub trait AlertManagerTrait: Send + Sync { async fn list_tags(&self) -> Vec; async fn get_all_alerts(&self) -> HashMap>; } + +#[async_trait] +pub trait CallableTarget { + async fn call(&self, payload: &Context); +} diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index 81fbcbfb9..aeb0f4042 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -16,24 +16,25 @@ * */ -use std::time::Duration; +use std::{str::FromStr, time::Duration}; use chrono::{DateTime, Utc}; use tonic::async_trait; -use tracing::trace; +use tracing::{info, trace, warn}; use ulid::Ulid; use crate::{ alerts::{ AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity, ThresholdConfig, + alert_enums::NotificationState, + alert_structs::GroupResult, + alert_traits::{AlertTrait, MessageCreation}, alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range}, - is_query_aggregate, - target::{self, TARGETS}, - traits::AlertTrait, + get_number_of_agg_exprs, + target::{self, NotificationConfig}, }, handlers::http::query::create_streams_for_distributed, - option::Mode, parseable::PARSEABLE, query::resolve_stream_names, rbac::map::SessionKey, @@ -56,6 +57,8 @@ pub struct ThresholdAlert { // for new alerts, state should be resolved #[serde(default)] pub state: AlertState, + pub notification_state: NotificationState, + pub notification_config: NotificationConfig, pub created: DateTime, pub tags: Option>, pub datasets: Vec, @@ -63,26 +66,51 @@ pub struct ThresholdAlert { #[async_trait] impl AlertTrait for ThresholdAlert { - async fn eval_alert(&self) -> Result<(bool, f64), AlertError> { + async fn eval_alert(&self) -> Result, AlertError> { let time_range = extract_time_range(&self.eval_config)?; - let final_value = execute_alert_query(self.get_query(), &time_range).await?; - let result = evaluate_condition( - &self.threshold_config.operator, - final_value, - self.threshold_config.value, - ); - Ok((result, final_value)) - } + let query_result = execute_alert_query(self.get_query(), &time_range).await?; + + if query_result.is_simple_query { + // Handle simple queries + let final_value = query_result.get_single_value(); + let result = evaluate_condition( + &self.threshold_config.operator, + final_value, + self.threshold_config.value, + ); + + let message = if result { + Some(self.create_threshold_message(final_value)?) + } else { + None + }; + Ok(message) + } else { + // Handle GROUP BY queries - evaluate each group + let mut breached_groups = Vec::new(); + + for group in &query_result.groups { + let result = evaluate_condition( + &self.threshold_config.operator, + group.aggregate_value, + self.threshold_config.value, + ); + + if result { + breached_groups.push(group.clone()); + } + } - async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError> { - // validate alert type - // Anomaly is only allowed in Prism - if self.alert_type.eq(&AlertType::Anomaly) && PARSEABLE.options.mode != Mode::Prism { - return Err(AlertError::CustomError( - "Anomaly alert is only allowed on Prism mode".into(), - )); + let message = if !breached_groups.is_empty() { + Some(self.create_group_message(&breached_groups)?) + } else { + None + }; + Ok(message) } + } + async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError> { // validate evalType let eval_frequency = match &self.eval_config { EvalConfig::RollingWindow(rolling_window) => { @@ -96,42 +124,110 @@ impl AlertTrait for ThresholdAlert { }; // validate that target repeat notifs !> eval_frequency - for target_id in &self.targets { - let target = TARGETS.get_target_by_id(target_id).await?; - match &target.notification_config.times { - target::Retry::Infinite => {} - target::Retry::Finite(repeat) => { - let notif_duration = - Duration::from_secs(60 * target.notification_config.interval) - * *repeat as u32; - if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { - return Err(AlertError::Metadata( - "evalFrequency should be greater than target repetition interval", - )); - } + match &self.notification_config.times { + target::Retry::Infinite => {} + target::Retry::Finite(repeat) => { + let notif_duration = + Duration::from_secs(60 * self.notification_config.interval) * *repeat as u32; + if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { + return Err(AlertError::Metadata( + "evalFrequency should be greater than target repetition interval", + )); } } } // validate that the query is valid if self.query.is_empty() { - return Err(AlertError::InvalidAlertQuery); + return Err(AlertError::InvalidAlertQuery("Empty query".into())); } let tables = resolve_stream_names(&self.query)?; if tables.is_empty() { - return Err(AlertError::InvalidAlertQuery); + return Err(AlertError::InvalidAlertQuery( + "No tables found in query".into(), + )); } create_streams_for_distributed(tables) .await - .map_err(|_| AlertError::InvalidAlertQuery)?; + .map_err(|_| AlertError::InvalidAlertQuery("Invalid tables".into()))?; // validate that the user has access to the tables mentioned in the query user_auth_for_query(session_key, &self.query).await?; // validate that the alert query is valid and can be evaluated - if !is_query_aggregate(&self.query).await? { - return Err(AlertError::InvalidAlertQuery); + let num_aggrs = get_number_of_agg_exprs(&self.query).await?; + if num_aggrs != 1 { + return Err(AlertError::InvalidAlertQuery(format!( + "Found {num_aggrs} aggregate expressions, only 1 allowed" + ))); + } + Ok(()) + } + + async fn update_notification_state( + &mut self, + new_notification_state: NotificationState, + ) -> Result<(), AlertError> { + let store = PARSEABLE.storage.get_object_store(); + // update state in memory + self.notification_state = new_notification_state; + // update on disk + store.put_alert(self.id, &self.to_alert_config()).await?; + + Ok(()) + } + + async fn update_state( + &mut self, + new_state: AlertState, + trigger_notif: Option, + ) -> Result<(), AlertError> { + let store = PARSEABLE.storage.get_object_store(); + if self.state.eq(&AlertState::Disabled) { + warn!( + "Alert- {} is currently Disabled. Updating state to {new_state}.", + self.id + ); + // update state in memory + self.state = new_state; + + // update on disk + store.put_alert(self.id, &self.to_alert_config()).await?; + // The task should have already been removed from the list of running tasks + return Ok(()); + } + + match &mut self.notification_state { + NotificationState::Notify => {} + NotificationState::Mute(till_time) => { + // if now > till_time, modify notif state to notify and proceed + let now = Utc::now(); + let till = match till_time.as_str() { + "indefinite" => DateTime::::MAX_UTC, + _ => DateTime::::from_str(till_time) + .map_err(|e| AlertError::CustomError(e.to_string()))?, + }; + if now > till { + info!( + "Modifying alert notif state from snoozed to notify- Now= {now}, Snooze till= {till}" + ); + self.notification_state = NotificationState::Notify; + } + } + } + + // update state in memory + self.state = new_state; + + // update on disk + store.put_alert(self.id, &self.to_alert_config()).await?; + + if trigger_notif.is_some() && self.notification_state.eq(&NotificationState::Notify) { + trace!("trigger notif on-\n{}", self.state); + self.to_alert_config() + .trigger_notifications(trigger_notif.unwrap()) + .await?; } Ok(()) } @@ -164,7 +260,7 @@ impl AlertTrait for ThresholdAlert { &self.eval_config } - fn get_targets(&self) -> &Vec { + fn get_targets(&self) -> &[Ulid] { &self.targets } @@ -178,21 +274,21 @@ impl AlertTrait for ThresholdAlert { } } - fn get_eval_window(&self) -> String { + fn get_eval_window(&self) -> &str { match &self.eval_config { - EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_start.clone(), + EvalConfig::RollingWindow(rolling_window) => rolling_window.eval_start.as_str(), } } fn get_created(&self) -> String { - self.created.to_string() + self.created.to_rfc3339() } fn get_tags(&self) -> &Option> { &self.tags } - fn get_datasets(&self) -> &Vec { + fn get_datasets(&self) -> &[String] { &self.datasets } @@ -204,49 +300,52 @@ impl AlertTrait for ThresholdAlert { fn clone_box(&self) -> Box { Box::new(self.clone()) } +} - async fn update_state( - &mut self, - is_manual: bool, - new_state: AlertState, - trigger_notif: Option, - ) -> Result<(), AlertError> { - let store = PARSEABLE.storage.get_object_store(); - match self.state { - AlertState::Triggered => { - if is_manual - && new_state != AlertState::Resolved - && new_state != AlertState::Silenced - { - let msg = format!("Not allowed to manually go from Triggered to {new_state}"); - return Err(AlertError::InvalidStateChange(msg)); - } - } - AlertState::Silenced => { - if is_manual && new_state != AlertState::Resolved { - let msg = format!("Not allowed to manually go from Silenced to {new_state}"); - return Err(AlertError::InvalidStateChange(msg)); - } - } - AlertState::Resolved => { - if is_manual { - let msg = format!("Not allowed to go manually from Resolved to {new_state}"); - return Err(AlertError::InvalidStateChange(msg)); - } - } - } - // update state in memory - self.state = new_state; - // update on disk - store.put_alert(self.id, &self.to_alert_config()).await?; +impl MessageCreation for ThresholdAlert { + fn get_message_header(&self) -> Result { + Ok(format!( + "Alert Name: {}\nAlert Type: Threshold alert\nSeverity: {}\nTriggered at: {}\nThreshold: {}\nAlert ID: {}\nEvaluation Window: {}\nFrequency: {}\n\nValues crossing the threshold:", + self.title, + self.severity, + Utc::now().to_rfc3339(), + format_args!( + "{} {}", + self.threshold_config.operator, self.threshold_config.value + ), + self.id, + self.get_eval_window(), + self.get_eval_frequency() + )) + } + fn create_threshold_message(&self, actual_value: f64) -> Result { + let header = self.get_message_header()?; + Ok(format!( + "{header}\nValue: {}\n\nQuery:\n{}", + actual_value, + self.get_query() + )) + } - if trigger_notif.is_some() { - trace!("trigger notif on-\n{}", self.state); - self.to_alert_config() - .trigger_notifications(trigger_notif.unwrap()) - .await?; - } - Ok(()) + fn create_anomaly_message( + &self, + _forecast_value: f64, + _lower_bound: f64, + _upper_bound: f64, + ) -> Result { + Err(AlertError::Unimplemented( + "Anomaly message creation is not allowed for Threshold alert".into(), + )) + } + + fn create_forecast_message( + &self, + _forecast_time: DateTime, + _forecast_value: f64, + ) -> Result { + Err(AlertError::Unimplemented( + "Forecast message creation is not allowed for Threshold alert".into(), + )) } } @@ -263,6 +362,8 @@ impl From for ThresholdAlert { eval_config: value.eval_config, targets: value.targets, state: value.state, + notification_state: value.notification_state, + notification_config: value.notification_config, created: value.created, tags: value.tags, datasets: value.datasets, @@ -283,9 +384,45 @@ impl From for AlertConfig { eval_config: val.eval_config, targets: val.targets, state: val.state, + notification_state: val.notification_state, + notification_config: val.notification_config, created: val.created, tags: val.tags, datasets: val.datasets, } } } + +impl ThresholdAlert { + fn create_group_message(&self, breached_groups: &[GroupResult]) -> Result { + let header = self.get_message_header()?; + let mut message = format!("{header}\n"); + + message.push_str(&format!( + "Alerting Groups ({} total):\n", + breached_groups.len() + )); + + for (index, group) in breached_groups.iter().enumerate() { + message.push_str(&format!("{}. ", index + 1)); + + if group.group_values.is_empty() { + message.push_str("[No GROUP BY]"); + } else { + let group_desc = group + .group_values + .iter() + .map(|(key, value)| format!("{}: {}", key, value)) + .collect::>() + .join(", "); + message.push_str(&group_desc); + } + + message.push_str(&format!(" → Value: {}\n", group.aggregate_value)); + } + + message.push_str(&format!("\nQuery:\n{}", self.get_query())); + + Ok(message) + } +} diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index be9676d37..d038f8224 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -16,19 +16,22 @@ * */ -use std::fmt::Display; +use std::{collections::HashMap, fmt::Display}; use actix_web::Either; -use arrow_array::{Float64Array, Int64Array, RecordBatch}; +use arrow_array::{Array, Float64Array, Int64Array, RecordBatch}; use datafusion::{ - logical_expr::Literal, + logical_expr::{Literal, LogicalPlan}, prelude::{Expr, lit}, }; -use itertools::Itertools; use tracing::trace; use crate::{ - alerts::{AlertTrait, Conditions, LogicalOperator, WhereConfigOperator}, + alerts::{ + AlertTrait, LogicalOperator, WhereConfigOperator, + alert_structs::{AlertQueryResult, Conditions, GroupResult}, + extract_aggregate_aliases, + }, handlers::http::{ cluster::send_query_request, query::{Query, create_streams_for_distributed}, @@ -55,9 +58,9 @@ use super::{ALERTS, AlertError, AlertOperator, AlertState}; pub async fn evaluate_alert(alert: &dyn AlertTrait) -> Result<(), AlertError> { trace!("RUNNING EVAL TASK FOR- {alert:?}"); - let (result, final_value) = alert.eval_alert().await?; + let message = alert.eval_alert().await?; - update_alert_state(alert, result, final_value).await + update_alert_state(alert, message).await } /// Extract time range from alert evaluation configuration @@ -70,8 +73,11 @@ pub fn extract_time_range(eval_config: &super::EvalConfig) -> Result Result { +/// Execute the alert query based on the current mode and return structured group results +pub async fn execute_alert_query( + query: &str, + time_range: &TimeRange, +) -> Result { match PARSEABLE.options.mode { Mode::All | Mode::Query => execute_local_query(query, time_range).await, Mode::Prism => execute_remote_query(query, time_range).await, @@ -83,7 +89,10 @@ pub async fn execute_alert_query(query: &str, time_range: &TimeRange) -> Result< } /// Execute alert query locally (Query/All mode) -async fn execute_local_query(query: &str, time_range: &TimeRange) -> Result { +async fn execute_local_query( + query: &str, + time_range: &TimeRange, +) -> Result { let session_state = QUERY_SESSION.state(); let tables = resolve_stream_names(query)?; @@ -93,7 +102,7 @@ async fn execute_local_query(query: &str, time_range: &TimeRange) -> Result Result Result { +async fn execute_remote_query( + query: &str, + time_range: &TimeRange, +) -> Result { + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state.create_logical_plan(query).await?; + let query_request = Query { query: query.to_string(), start_time: time_range.start.to_rfc3339(), @@ -130,24 +145,111 @@ async fn execute_remote_query(query: &str, time_range: &TimeRange) -> Result Result { - // due to the previous validations, we can be sure that we get an array of objects with just one entry - // [{"countField": Number(1120.251)}] - if let Some(array_val) = result_value.as_array() - && !array_val.is_empty() - && let Some(object) = array_val[0].as_object() - { - let values = object.values().map(|v| v.as_f64().unwrap()).collect_vec(); - Ok(values[0]) +/// Convert JSON result value to AlertQueryResult +/// Handles both simple queries and GROUP BY queries with multiple rows +fn convert_result_to_group_results( + result_value: serde_json::Value, + plan: LogicalPlan, +) -> Result { + let array_val = result_value + .as_array() + .ok_or_else(|| AlertError::CustomError("Expected array in query result".to_string()))?; + + let aggregate_aliases = extract_aggregate_aliases(&plan); + + if array_val.is_empty() || aggregate_aliases.is_empty() { + return Ok(AlertQueryResult { + groups: vec![], + is_simple_query: true, + }); + } + + // take the first entry and extract the column name / alias + let (agg_condition, alias) = &aggregate_aliases[0]; + + let aggregate_key = if let Some(alias) = alias { + alias } else { - Err(AlertError::CustomError( - "Query result is not a number or response is empty".to_string(), - )) + agg_condition + }; + + // Find the aggregate column from the first row + let first_row = array_val[0] + .as_object() + .ok_or_else(|| AlertError::CustomError("Expected object in query result".to_string()))?; + + let is_simple_query = first_row.len() == 1; + let mut groups = Vec::new(); + + // Process each row as a separate group + for row in array_val { + if let Some(object) = row.as_object() { + let mut group_values = HashMap::new(); + let mut aggregate_value = 0.0; + + for (key, value) in object { + if key == aggregate_key { + aggregate_value = value.as_f64().ok_or_else(|| { + AlertError::CustomError(format!( + "Non-numeric value found in aggregate column '{}'", + aggregate_key + )) + })?; + } else { + // This is a GROUP BY column + group_values + .insert(key.clone(), value.to_string().trim_matches('"').to_string()); + } + } + + groups.push(GroupResult { + group_values, + aggregate_value, + }); + } + } + + Ok(AlertQueryResult { + groups, + is_simple_query, + }) +} + +/// Extract numeric value from an Arrow array at the given row index +fn extract_numeric_value(column: &dyn Array, row_index: usize) -> f64 { + if let Some(float_array) = column.as_any().downcast_ref::() { + if !float_array.is_null(row_index) { + return float_array.value(row_index); + } + } else if let Some(int_array) = column.as_any().downcast_ref::() + && !int_array.is_null(row_index) + { + return int_array.value(row_index) as f64; } + 0.0 +} + +/// Extract string value from an Arrow array at the given row index +fn extract_string_value(column: &dyn Array, row_index: usize) -> String { + use arrow_array::StringArray; + + if let Some(string_array) = column.as_any().downcast_ref::() { + if !string_array.is_null(row_index) { + return string_array.value(row_index).to_string(); + } + } else if let Some(int_array) = column.as_any().downcast_ref::() { + if !int_array.is_null(row_index) { + return int_array.value(row_index).to_string(); + } + } else if let Some(float_array) = column.as_any().downcast_ref::() + && !float_array.is_null(row_index) + { + return float_array.value(row_index).to_string(); + } + "null".to_string() } pub fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> bool { @@ -163,30 +265,22 @@ pub fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) async fn update_alert_state( alert: &dyn AlertTrait, - final_res: bool, - actual_value: f64, + message: Option, ) -> Result<(), AlertError> { - let guard = ALERTS.write().await; - let alerts = if let Some(alerts) = guard.as_ref() { - alerts - } else { - return Err(AlertError::CustomError("No AlertManager set".into())); + // Get the alert manager reference while holding the lock briefly + let alerts = { + let guard = ALERTS.read().await; + if let Some(alerts) = guard.as_ref() { + alerts.clone() + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + } }; - if final_res { - let message = format!( - "Alert Triggered: {}\n\nThreshold: ({} {})\nCurrent Value: {}\nEvaluation Window: {} | Frequency: {}\n\nQuery:\n{}", - alert.get_id(), - alert.get_threshold_config().operator, - alert.get_threshold_config().value, - actual_value, - alert.get_eval_window(), - alert.get_eval_frequency(), - alert.get_query() - ); - + // Now perform the state update + if let Some(msg) = message { alerts - .update_state(*alert.get_id(), AlertState::Triggered, Some(message)) + .update_state(*alert.get_id(), AlertState::Triggered, Some(msg)) .await } else if alerts .get_state(*alert.get_id()) @@ -194,42 +288,73 @@ async fn update_alert_state( .eq(&AlertState::Triggered) { alerts - .update_state(*alert.get_id(), AlertState::Resolved, Some("".into())) + .update_state(*alert.get_id(), AlertState::NotTriggered, Some("".into())) .await } else { alerts - .update_state(*alert.get_id(), AlertState::Resolved, None) + .update_state(*alert.get_id(), AlertState::NotTriggered, None) .await } } -fn get_final_value(records: Vec) -> f64 { +/// Extract group results from record batches, supporting both simple and GROUP BY queries +fn extract_group_results(records: Vec, plan: LogicalPlan) -> AlertQueryResult { trace!("records-\n{records:?}"); - if let Some(f) = records - .first() - .and_then(|batch| { - trace!("batch.column(0)-\n{:?}", batch.column(0)); - batch.column(0).as_any().downcast_ref::() - }) - .map(|array| { - trace!("array-\n{array:?}"); - array.value(0) - }) - { - f + let aggregate_aliases = extract_aggregate_aliases(&plan); + + // since there is going to be only one aggregate, we'll check if it is empty + if aggregate_aliases.is_empty() || records.is_empty() { + return AlertQueryResult { + groups: vec![], + is_simple_query: true, + }; + } + + // take the first entry and extract the column name / alias + let (agg_condition, alias) = &aggregate_aliases[0]; + + let alias = if let Some(alias) = alias { + alias } else { - records - .first() - .and_then(|batch| { - trace!("batch.column(0)-\n{:?}", batch.column(0)); - batch.column(0).as_any().downcast_ref::() - }) - .map(|array| { - trace!("array-\n{array:?}"); - array.value(0) - }) - .unwrap_or_default() as f64 + agg_condition + }; + + let first_batch = &records[0]; + let schema = first_batch.schema(); + + // Determine if this is a simple query (no GROUP BY) or a grouped query + let is_simple_query = schema.fields().len() == 1; + + let mut groups = Vec::new(); + + for batch in &records { + for row_index in 0..batch.num_rows() { + let mut group_values = HashMap::new(); + let mut aggregate_value = 0.0; + + // Extract values for each column + for (col_index, field) in schema.fields().iter().enumerate() { + let column = batch.column(col_index); + if field.name().eq(alias) { + aggregate_value = extract_numeric_value(column, row_index) + } else { + // This is a GROUP BY column + let value = extract_string_value(column, row_index); + group_values.insert(field.name().clone(), value); + } + } + + groups.push(GroupResult { + group_values, + aggregate_value, + }); + } + } + + AlertQueryResult { + groups, + is_simple_query, } } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 5597f71c3..ceaa3ecca 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -19,37 +19,45 @@ use actix_web::http::header::ContentType; use arrow_schema::{ArrowError, DataType, Schema}; use async_trait::async_trait; -use chrono::{DateTime, Utc}; +use chrono::Utc; use datafusion::logical_expr::{LogicalPlan, Projection}; +use datafusion::prelude::Expr; use datafusion::sql::sqlparser::parser::ParserError; use derive_more::FromStrError; -use derive_more::derive::FromStr; use http::StatusCode; -// use once_cell::sync::Lazy; -use serde::Serialize; use serde_json::{Error as SerdeError, Value as JsonValue}; use std::collections::HashMap; -use std::fmt::{self, Debug, Display}; +use std::fmt::Debug; use std::sync::Arc; use std::thread; -use std::time::Duration; +// use std::time::Duration; use tokio::sync::oneshot::{Receiver, Sender}; use tokio::sync::{RwLock, mpsc}; use tokio::task::JoinHandle; use tracing::{error, trace, warn}; use ulid::Ulid; +pub mod alert_enums; +pub mod alert_structs; +pub mod alert_traits; pub mod alert_types; pub mod alerts_utils; pub mod target; -pub mod traits; +pub use crate::alerts::alert_enums::{ + AggregateFunction, AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig, + LogicalOperator, NotificationState, Severity, WhereConfigOperator, +}; +pub use crate::alerts::alert_structs::{ + AlertConfig, AlertInfo, AlertRequest, Alerts, AlertsInfo, AlertsInfoByState, AlertsSummary, + BasicAlertFields, Context, DeploymentInfo, RollingWindow, ThresholdConfig, +}; +use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait}; use crate::alerts::alert_types::ThresholdAlert; -use crate::alerts::target::TARGETS; -use crate::alerts::traits::{AlertManagerTrait, AlertTrait}; +use crate::alerts::target::{NotificationConfig, TARGETS}; use crate::handlers::http::fetch_schema; -use crate::handlers::http::query::create_streams_for_distributed; -use crate::option::Mode; +// use crate::handlers::http::query::create_streams_for_distributed; +// use crate::option::Mode; use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::query::{QUERY_SESSION, resolve_stream_names}; use crate::rbac::map::SessionKey; @@ -58,13 +66,6 @@ use crate::storage::{ALERTS_ROOT_DIRECTORY, ObjectStorageError}; use crate::sync::alert_runtime; use crate::utils::user_auth_for_query; -/// Helper struct for basic alert fields during migration -struct BasicAlertFields { - id: Ulid, - title: String, - severity: Severity, -} - // these types describe the scheduled task for an alert pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>); @@ -91,7 +92,7 @@ pub async fn set_alert_manager(manager: Arc) { } pub fn create_default_alerts_manager() -> Alerts { - let (tx, rx) = mpsc::channel::(10); + let (tx, rx) = mpsc::channel::(1000); let alerts = Alerts { alerts: RwLock::new(HashMap::new()), sender: tx, @@ -100,516 +101,6 @@ pub fn create_default_alerts_manager() -> Alerts { alerts } -// pub static ALERTS: Lazy = Lazy::new(|| { -// let (tx, rx) = mpsc::channel::(10); -// let alerts = Alerts { -// alerts: RwLock::new(HashMap::new()), -// sender: tx, -// }; - -// thread::spawn(|| alert_runtime(rx)); - -// alerts -// }); - -#[derive(Debug)] -pub struct Alerts { - pub alerts: RwLock>>, - pub sender: mpsc::Sender, -} - -pub enum AlertTask { - Create(Box), - Delete(Ulid), -} - -#[derive(Default, Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "lowercase")] -pub enum AlertVersion { - V1, - #[default] - V2, -} - -impl From<&str> for AlertVersion { - fn from(value: &str) -> Self { - match value { - "v1" => Self::V1, - "v2" => Self::V2, - _ => Self::V2, // default to v2 - } - } -} - -#[async_trait] -pub trait CallableTarget { - async fn call(&self, payload: &Context); -} - -#[derive(Debug, Clone)] -pub struct Context { - alert_info: AlertInfo, - deployment_info: DeploymentInfo, - message: String, -} - -impl Context { - pub fn new(alert_info: AlertInfo, deployment_info: DeploymentInfo, message: String) -> Self { - Self { - alert_info, - deployment_info, - message, - } - } - - fn default_alert_string(&self) -> String { - format!( - "AlertName: {}\nTriggered TimeStamp: {}\nSeverity: {}\n{}", - self.alert_info.alert_name, - Utc::now().to_rfc3339(), - self.alert_info.severity, - self.message - ) - } - - fn default_resolved_string(&self) -> String { - format!("{} is now resolved ", self.alert_info.alert_name) - } - - fn default_silenced_string(&self) -> String { - format!( - "Notifications for {} have been silenced ", - self.alert_info.alert_name - ) - } -} - -#[derive(Debug, Clone)] -pub struct AlertInfo { - alert_id: Ulid, - alert_name: String, - // message: String, - // reason: String, - alert_state: AlertState, - severity: String, -} - -impl AlertInfo { - pub fn new( - alert_id: Ulid, - alert_name: String, - alert_state: AlertState, - severity: String, - ) -> Self { - Self { - alert_id, - alert_name, - alert_state, - severity, - } - } -} - -#[derive(Debug, Clone)] -pub struct DeploymentInfo { - deployment_instance: String, - deployment_id: Ulid, - deployment_mode: String, -} - -impl DeploymentInfo { - pub fn new(deployment_instance: String, deployment_id: Ulid, deployment_mode: String) -> Self { - Self { - deployment_instance, - deployment_id, - deployment_mode, - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)] -#[serde(rename_all = "camelCase")] -pub enum AlertType { - Threshold, - Anomaly, - Forecast, -} - -impl Display for AlertType { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - AlertType::Threshold => write!(f, "threshold"), - AlertType::Anomaly => write!(f, "anomaly"), - AlertType::Forecast => write!(f, "forecast"), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub enum AlertOperator { - #[serde(rename = ">")] - GreaterThan, - #[serde(rename = "<")] - LessThan, - #[serde(rename = "=")] - Equal, - #[serde(rename = "!=")] - NotEqual, - #[serde(rename = ">=")] - GreaterThanOrEqual, - #[serde(rename = "<=")] - LessThanOrEqual, -} - -impl Display for AlertOperator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - AlertOperator::GreaterThan => write!(f, ">"), - AlertOperator::LessThan => write!(f, "<"), - AlertOperator::Equal => write!(f, "="), - AlertOperator::NotEqual => write!(f, "!="), - AlertOperator::GreaterThanOrEqual => write!(f, ">="), - AlertOperator::LessThanOrEqual => write!(f, "<="), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, FromStr, PartialEq, Eq)] -#[serde(rename_all = "camelCase")] -pub enum WhereConfigOperator { - #[serde(rename = "=")] - Equal, - #[serde(rename = "!=")] - NotEqual, - #[serde(rename = "<")] - LessThan, - #[serde(rename = ">")] - GreaterThan, - #[serde(rename = "<=")] - LessThanOrEqual, - #[serde(rename = ">=")] - GreaterThanOrEqual, - #[serde(rename = "is null")] - IsNull, - #[serde(rename = "is not null")] - IsNotNull, - #[serde(rename = "ilike")] - ILike, - #[serde(rename = "contains")] - Contains, - #[serde(rename = "begins with")] - BeginsWith, - #[serde(rename = "ends with")] - EndsWith, - #[serde(rename = "does not contain")] - DoesNotContain, - #[serde(rename = "does not begin with")] - DoesNotBeginWith, - #[serde(rename = "does not end with")] - DoesNotEndWith, -} - -impl WhereConfigOperator { - /// Convert the enum value to its string representation - pub fn as_str(&self) -> &'static str { - match self { - Self::Equal => "=", - Self::NotEqual => "!=", - Self::LessThan => "<", - Self::GreaterThan => ">", - Self::LessThanOrEqual => "<=", - Self::GreaterThanOrEqual => ">=", - Self::IsNull => "is null", - Self::IsNotNull => "is not null", - Self::ILike => "ilike", - Self::Contains => "contains", - Self::BeginsWith => "begins with", - Self::EndsWith => "ends with", - Self::DoesNotContain => "does not contain", - Self::DoesNotBeginWith => "does not begin with", - Self::DoesNotEndWith => "does not end with", - } - } -} - -impl Display for WhereConfigOperator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - // We can reuse our as_str method to get the string representation - write!(f, "{}", self.as_str()) - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub enum AggregateFunction { - Avg, - Count, - CountDistinct, - Min, - Max, - Sum, -} - -impl Display for AggregateFunction { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - AggregateFunction::Avg => write!(f, "Avg"), - AggregateFunction::Count => write!(f, "Count"), - AggregateFunction::CountDistinct => write!(f, "CountDistinct"), - AggregateFunction::Min => write!(f, "Min"), - AggregateFunction::Max => write!(f, "Max"), - AggregateFunction::Sum => write!(f, "Sum"), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -pub struct OperationConfig { - pub column: String, - pub operator: Option, - pub value: Option, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct FilterConfig { - pub conditions: Vec, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -pub struct ConditionConfig { - pub column: String, - pub operator: WhereConfigOperator, - pub value: Option, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct Conditions { - pub operator: Option, - pub condition_config: Vec, -} - -impl Conditions { - pub fn generate_filter_message(&self) -> String { - match &self.operator { - Some(op) => match op { - LogicalOperator::And | LogicalOperator::Or => { - let expr1 = &self.condition_config[0]; - let expr2 = &self.condition_config[1]; - let expr1_msg = if expr1.value.as_ref().is_some_and(|v| !v.is_empty()) { - format!( - "{} {} {}", - expr1.column, - expr1.operator, - expr1.value.as_ref().unwrap() - ) - } else { - format!("{} {}", expr1.column, expr1.operator) - }; - - let expr2_msg = if expr2.value.as_ref().is_some_and(|v| !v.is_empty()) { - format!( - "{} {} {}", - expr2.column, - expr2.operator, - expr2.value.as_ref().unwrap() - ) - } else { - format!("{} {}", expr2.column, expr2.operator) - }; - - format!("[{expr1_msg} {op} {expr2_msg}]") - } - }, - None => { - let expr = &self.condition_config[0]; - if let Some(val) = &expr.value { - format!("{} {} {}", expr.column, expr.operator, val) - } else { - format!("{} {}", expr.column, expr.operator) - } - } - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct GroupBy { - pub columns: Vec, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct ThresholdConfig { - pub operator: AlertOperator, - pub value: f64, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct RollingWindow { - // x minutes (25m) - pub eval_start: String, - // should always be "now" - pub eval_end: String, - // x minutes (5m) - pub eval_frequency: u64, -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub enum EvalConfig { - RollingWindow(RollingWindow), -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct AlertEval {} - -#[derive( - Debug, - serde::Serialize, - serde::Deserialize, - Clone, - Copy, - PartialEq, - Eq, - PartialOrd, - Ord, - Default, - FromStr, -)] -#[serde(rename_all = "camelCase")] -pub enum AlertState { - Triggered, - Silenced, - #[default] - Resolved, -} - -impl Display for AlertState { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - AlertState::Triggered => write!(f, "Triggered"), - AlertState::Silenced => write!(f, "Silenced"), - AlertState::Resolved => write!(f, "Resolved"), - } - } -} - -#[derive( - Debug, - serde::Serialize, - serde::Deserialize, - Clone, - Copy, - PartialEq, - Eq, - PartialOrd, - Ord, - Default, - FromStr, -)] -#[serde(rename_all = "camelCase")] -pub enum Severity { - Critical, - High, - #[default] - Medium, - Low, -} - -impl Display for Severity { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Severity::Critical => write!(f, "Critical"), - Severity::High => write!(f, "High"), - Severity::Medium => write!(f, "Medium"), - Severity::Low => write!(f, "Low"), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub enum LogicalOperator { - And, - Or, -} - -impl Display for LogicalOperator { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - LogicalOperator::And => write!(f, "AND"), - LogicalOperator::Or => write!(f, "OR"), - } - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct AlertRequest { - #[serde(default = "Severity::default")] - pub severity: Severity, - pub title: String, - pub query: String, - pub alert_type: AlertType, - pub threshold_config: ThresholdConfig, - pub eval_config: EvalConfig, - pub targets: Vec, - pub tags: Option>, -} - -impl AlertRequest { - pub async fn into(self) -> Result { - // Validate that all target IDs exist - for id in &self.targets { - TARGETS.get_target_by_id(id).await?; - } - let datasets = resolve_stream_names(&self.query)?; - let config = AlertConfig { - version: AlertVersion::from(CURRENT_ALERTS_VERSION), - id: Ulid::new(), - severity: self.severity, - title: self.title, - query: self.query, - datasets, - alert_type: self.alert_type, - threshold_config: self.threshold_config, - eval_config: self.eval_config, - targets: self.targets, - state: AlertState::default(), - created: Utc::now(), - tags: self.tags, - }; - Ok(config) - } -} - -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct AlertConfig { - pub version: AlertVersion, - #[serde(default)] - pub id: Ulid, - pub severity: Severity, - pub title: String, - pub query: String, - pub datasets: Vec, - pub alert_type: AlertType, - pub threshold_config: ThresholdConfig, - pub eval_config: EvalConfig, - pub targets: Vec, - // for new alerts, state should be resolved - #[serde(default)] - pub state: AlertState, - pub created: DateTime, - pub tags: Option>, -} - impl AlertConfig { /// Migration function to convert v1 alerts to v2 structure pub async fn migrate_from_v1( @@ -639,6 +130,8 @@ impl AlertConfig { eval_config, targets, state, + notification_state: NotificationState::Notify, + notification_config: NotificationConfig::default(), created: Utc::now(), tags: None, }; @@ -1068,88 +561,9 @@ impl AlertConfig { let state_str = alert_json["state"].as_str().unwrap_or("resolved"); match state_str.to_lowercase().as_str() { "triggered" => AlertState::Triggered, - "silenced" => AlertState::Silenced, - "resolved" => AlertState::Resolved, - _ => AlertState::Resolved, - } - } - - pub async fn modify(&mut self, alert: AlertRequest) -> Result<(), AlertError> { - // Validate that all target IDs exist - for id in &alert.targets { - TARGETS.get_target_by_id(id).await?; - } - self.title = alert.title; - self.query = alert.query; - self.alert_type = alert.alert_type; - self.threshold_config = alert.threshold_config; - self.eval_config = alert.eval_config; - self.targets = alert.targets; - self.state = AlertState::default(); - Ok(()) - } - - /// Validations - pub async fn validate(&self, session_key: SessionKey) -> Result<(), AlertError> { - // validate alert type - // Anomaly is only allowed in Prism - if self.alert_type.eq(&AlertType::Anomaly) && PARSEABLE.options.mode != Mode::Prism { - return Err(AlertError::CustomError( - "Anomaly alert is only allowed on Prism mode".into(), - )); - } - - // validate evalType - let eval_frequency = match &self.eval_config { - EvalConfig::RollingWindow(rolling_window) => { - if humantime::parse_duration(&rolling_window.eval_start).is_err() { - return Err(AlertError::Metadata( - "evalStart should be of type humantime", - )); - } - rolling_window.eval_frequency - } - }; - - // validate that target repeat notifs !> eval_frequency - for target_id in &self.targets { - let target = TARGETS.get_target_by_id(target_id).await?; - match &target.notification_config.times { - target::Retry::Infinite => {} - target::Retry::Finite(repeat) => { - let notif_duration = - Duration::from_secs(60 * target.notification_config.interval) - * *repeat as u32; - if (notif_duration.as_secs_f64()).gt(&((eval_frequency * 60) as f64)) { - return Err(AlertError::Metadata( - "evalFrequency should be greater than target repetition interval", - )); - } - } - } + "resolved" => AlertState::NotTriggered, + _ => AlertState::NotTriggered, } - - // validate that the query is valid - if self.query.is_empty() { - return Err(AlertError::InvalidAlertQuery); - } - - let tables = resolve_stream_names(&self.query)?; - if tables.is_empty() { - return Err(AlertError::InvalidAlertQuery); - } - create_streams_for_distributed(tables) - .await - .map_err(|_| AlertError::InvalidAlertQuery)?; - - // validate that the user has access to the tables mentioned in the query - user_auth_for_query(&session_key, &self.query).await?; - - // validate that the alert query is valid and can be evaluated - if !is_query_aggregate(&self.query).await? { - return Err(AlertError::InvalidAlertQuery); - } - Ok(()) } pub fn get_eval_frequency(&self) -> u64 { @@ -1180,9 +594,11 @@ impl AlertConfig { self.id, self.title.clone(), self.state, - self.severity.to_string(), + alert_enums::NotificationState::Notify, + self.severity.clone().to_string(), ), DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), + self.notification_config.clone(), String::default(), ) } @@ -1218,6 +634,11 @@ impl AlertConfig { serde_json::Value::String(self.alert_type.to_string()), ); + map.insert( + "notificationState".to_string(), + serde_json::Value::String(self.notification_state.to_string()), + ); + map.insert( "id".to_string(), serde_json::Value::String(self.id.to_string()), @@ -1259,7 +680,21 @@ impl AlertConfig { } /// Check if a query is an aggregate query that returns a single value without executing it -pub async fn is_query_aggregate(query: &str) -> Result { +pub async fn get_number_of_agg_exprs(query: &str) -> Result { + let session_state = QUERY_SESSION.state(); + + // Parse the query into a logical plan + let logical_plan = session_state + .create_logical_plan(query) + .await + .map_err(|err| AlertError::CustomError(format!("Failed to parse query: {err}")))?; + + // Check if the plan structure indicates an aggregate query + _get_number_of_agg_exprs(&logical_plan) +} + +/// Extract the projection which deals with aggregation +pub async fn get_aggregate_projection(query: &str) -> Result { let session_state = QUERY_SESSION.state(); // Parse the query into a logical plan @@ -1269,21 +704,185 @@ pub async fn is_query_aggregate(query: &str) -> Result { .map_err(|err| AlertError::CustomError(format!("Failed to parse query: {err}")))?; // Check if the plan structure indicates an aggregate query - Ok(is_logical_plan_aggregate(&logical_plan)) + _get_aggregate_projection(&logical_plan) +} + +fn _get_aggregate_projection(plan: &LogicalPlan) -> Result { + match plan { + LogicalPlan::Aggregate(agg) => { + // let fields = exprlist_to_fields(&agg.aggr_expr, &agg.input)?; + match &agg.aggr_expr[0] { + datafusion::prelude::Expr::Alias(alias) => Ok(alias.name.clone()), + _ => Ok(agg.aggr_expr[0].name_for_alias()?), + } + } + // Projection over aggregate: SELECT COUNT(*) as total, SELECT AVG(col) as average + LogicalPlan::Projection(Projection { input, .. }) => _get_aggregate_projection(input), + // Do not consider any aggregates inside a subquery or recursive CTEs + LogicalPlan::Subquery(_) | LogicalPlan::RecursiveQuery(_) => { + Err(AlertError::InvalidAlertQuery("Subquery not allowed".into())) + } + // Recursively check wrapped plans (Filter, Limit, Sort, etc.) + _ => { + // Use inputs() method to get all input plans and recursively search + for input in plan.inputs() { + if let Ok(result) = _get_aggregate_projection(input) { + return Ok(result); + } + } + Err(AlertError::InvalidAlertQuery( + "No aggregate projection found".into(), + )) + } + } +} + +/// Extracts aliases for aggregate functions from a DataFusion logical plan +pub fn extract_aggregate_aliases(plan: &LogicalPlan) -> Vec<(String, Option)> { + let mut aliases = Vec::new(); + + // Handle different logical plan node types + match plan { + LogicalPlan::Projection(projection) => { + for expr in &projection.expr { + if let Some((agg_name, alias)) = extract_alias_from_expr(expr) { + aliases.push((agg_name, alias)); + } + } + } + LogicalPlan::Aggregate(aggregate) => { + // Check aggregate expressions directly + for expr in &aggregate.aggr_expr { + if let Some((agg_name, alias)) = extract_alias_from_expr(expr) { + aliases.push((agg_name, alias)); + } + } + + // Also check group expressions in case they contain aggregates + for expr in &aggregate.group_expr { + if let Some((agg_name, alias)) = extract_alias_from_expr(expr) { + aliases.push((agg_name, alias)); + } + } + } + _ => { + // For other node types, continue traversal + } + } + + // Recursively check child plans + for input in plan.inputs() { + aliases.extend(extract_aggregate_aliases(input)); + } + + aliases +} + +/// Extracts aggregate function name and alias from an expression +fn extract_alias_from_expr(expr: &Expr) -> Option<(String, Option)> { + match expr { + Expr::Alias(alias_expr) => { + let alias_name = alias_expr.name.clone(); + + // Check if the aliased expression is an aggregate + if let Some((agg_name, _)) = extract_alias_from_expr(&alias_expr.expr) { + Some((agg_name, Some(alias_name))) + } else { + None + } + } + Expr::AggregateFunction(agg_func) => { + let agg_name = format!("{:?}", agg_func.func); + Some((agg_name, None)) + } + // Handle specific aggregate function variants + Expr::WindowFunction(window_func) => { + // Some aggregates might appear as window functions + let func_name = format!("{:?}", window_func.fun); + if is_aggregate_function(&func_name) { + Some((func_name, None)) + } else { + None + } + } + // Handle built-in aggregate functions that might not be AggregateFunction + Expr::ScalarFunction(scalar_func) => { + let func_name = format!("{:?}", scalar_func.func); + if is_aggregate_function(&func_name) { + Some((func_name, None)) + } else { + None + } + } + Expr::Column(column_expr) => { + // Check if column name suggests it's an aggregate result + let column_name = column_expr.name(); + if is_likely_aggregate_column(column_name) { + Some((column_name.to_owned(), None)) + } else { + None + } + } + _ => None, + } +} + +/// Helper function to determine if a function name represents an aggregate +fn is_aggregate_function(func_name: &str) -> bool { + let lower_func = func_name.to_lowercase(); + matches!( + lower_func.as_str(), + "count" + | "sum" + | "avg" + | "mean" + | "min" + | "max" + | "stddev" + | "variance" + | "first" + | "last" + | "array_agg" + | "string_agg" + | "bit_and" + | "bit_or" + | "bit_xor" + ) || lower_func.contains("count") + || lower_func.contains("sum") + || lower_func.contains("avg") + || lower_func.contains("min") + || lower_func.contains("max") +} + +/// Helper function to determine if a column name suggests it's an aggregate result +fn is_likely_aggregate_column(column_name: &str) -> bool { + let lower_name = column_name.to_lowercase(); + lower_name.starts_with("count_") + || lower_name.starts_with("sum_") + || lower_name.starts_with("avg_") + || lower_name.starts_with("min_") + || lower_name.starts_with("max_") + || lower_name.contains("count(") + || lower_name.contains("sum(") + || lower_name.contains("avg(") + || lower_name.contains("min(") + || lower_name.contains("max(") } /// Analyze a logical plan to determine if it represents an aggregate query -pub fn is_logical_plan_aggregate(plan: &LogicalPlan) -> bool { +/// +/// Returns the number of aggregate expressions found in the plan +fn _get_number_of_agg_exprs(plan: &LogicalPlan) -> Result { match plan { // Direct aggregate: SELECT COUNT(*), AVG(col), etc. - LogicalPlan::Aggregate(_) => true, + LogicalPlan::Aggregate(agg) => Ok(agg.aggr_expr.len()), // Projection over aggregate: SELECT COUNT(*) as total, SELECT AVG(col) as average - LogicalPlan::Projection(Projection { input, expr, .. }) => { - // Check if input contains an aggregate and we have exactly one expression - let is_aggregate_input = is_logical_plan_aggregate(input); - let single_expr = expr.len() == 1; - is_aggregate_input && single_expr + LogicalPlan::Projection(Projection { input, .. }) => _get_number_of_agg_exprs(input), + + // Do not consider any aggregates inside a subquery or recursive CTEs + LogicalPlan::Subquery(_) | LogicalPlan::RecursiveQuery(_) => { + Err(AlertError::InvalidAlertQuery("Subquery not allowed".into())) } // Recursively check wrapped plans (Filter, Limit, Sort, etc.) @@ -1291,7 +890,8 @@ pub fn is_logical_plan_aggregate(plan: &LogicalPlan) -> bool { // Use inputs() method to get all input plans plan.inputs() .iter() - .any(|input| is_logical_plan_aggregate(input)) + .map(|input| _get_number_of_agg_exprs(input)) + .sum() } } } @@ -1318,7 +918,7 @@ pub enum AlertError { StreamNotFound(#[from] StreamNotFound), #[error("{0}")] Anyhow(#[from] anyhow::Error), - #[error("No alert request body provided")] + #[error("Invalid alert modification request")] InvalidAlertModifyRequest, #[error("{0}")] FromStrError(#[from] FromStrError), @@ -1330,14 +930,18 @@ pub enum AlertError { TargetInUse, #[error("{0}")] ParserError(#[from] ParserError), - #[error("Invalid alert query")] - InvalidAlertQuery, + #[error("Invalid alert query: {0}")] + InvalidAlertQuery(String), #[error("Invalid query parameter: {0}")] InvalidQueryParameter(String), #[error("{0}")] ArrowError(#[from] ArrowError), #[error("Upgrade to Parseable Enterprise for {0} type alerts")] - NotPresentInOSS(String), + NotPresentInOSS(&'static str), + #[error("{0}")] + Unimplemented(String), + #[error("{0}")] + ValidationFailure(String), } impl actix_web::ResponseError for AlertError { @@ -1359,9 +963,11 @@ impl actix_web::ResponseError for AlertError { Self::InvalidTargetModification(_) => StatusCode::BAD_REQUEST, Self::TargetInUse => StatusCode::CONFLICT, Self::ParserError(_) => StatusCode::BAD_REQUEST, - Self::InvalidAlertQuery => StatusCode::BAD_REQUEST, + Self::InvalidAlertQuery(_) => StatusCode::BAD_REQUEST, Self::InvalidQueryParameter(_) => StatusCode::BAD_REQUEST, + Self::ValidationFailure(_) => StatusCode::BAD_REQUEST, Self::ArrowError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Unimplemented(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST, } } @@ -1441,19 +1047,24 @@ impl AlertManagerTrait for Alerts { AlertType::Threshold => { Box::new(ThresholdAlert::from(alert)) as Box } - AlertType::Anomaly => { + AlertType::Anomaly(_) => { return Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("anomaly".into()).to_string(), + AlertError::NotPresentInOSS("anomaly").to_string(), )); } - AlertType::Forecast => { + AlertType::Forecast(_) => { return Err(anyhow::Error::msg( - AlertError::NotPresentInOSS("forecast".into()).to_string(), + AlertError::NotPresentInOSS("forecast").to_string(), )); } }; - // Create alert task + // Create alert task iff alert's state is not paused + if alert.get_state().eq(&AlertState::Disabled) { + map.insert(*alert.get_id(), alert); + continue; + } + match self.sender.send(AlertTask::Create(alert.clone_box())).await { Ok(_) => {} Err(e) => { @@ -1481,29 +1092,65 @@ impl AlertManagerTrait for Alerts { session: SessionKey, tags: Vec, ) -> Result, AlertError> { - let mut alerts: Vec = Vec::new(); - for (_, alert) in self.alerts.read().await.iter() { - // filter based on whether the user can execute this query or not - if user_auth_for_query(&session, alert.get_query()) + // First, collect all alerts without performing auth checks to avoid holding the lock + let all_alerts: Vec = { + let alerts_guard = self.alerts.read().await; + alerts_guard + .values() + .map(|alert| alert.to_alert_config()) + .collect() + }; + // Lock is released here, now perform expensive auth checks + + let authorized_alerts = if tags.is_empty() { + // Parallelize authorization checks + let futures: Vec<_> = all_alerts + .into_iter() + .map(|alert| async { + if user_auth_for_query(&session.clone(), &alert.query) + .await + .is_ok() + { + Some(alert) + } else { + None + } + }) + .collect(); + + futures::future::join_all(futures) .await - .is_ok() - { - alerts.push(alert.to_alert_config()); - } - } - if tags.is_empty() { - return Ok(alerts); - } - // filter alerts based on tags - alerts.retain(|alert| { - if let Some(alert_tags) = &alert.tags { - alert_tags.iter().any(|tag| tags.contains(tag)) - } else { - false - } - }); + .into_iter() + .flatten() + .collect() + } else { + // Parallelize authorization checks and then filter by tags + let futures: Vec<_> = all_alerts + .into_iter() + .map(|alert| async { + if user_auth_for_query(&session, &alert.query).await.is_ok() { + Some(alert) + } else { + None + } + }) + .collect(); + + futures::future::join_all(futures) + .await + .into_iter() + .flatten() + .filter(|alert| { + if let Some(alert_tags) = &alert.tags { + alert_tags.iter().any(|tag| tags.contains(tag)) + } else { + false + } + }) + .collect() + }; - Ok(alerts) + Ok(authorized_alerts) } /// Returns a single alert that the user has access to (based on query auth) @@ -1532,6 +1179,71 @@ impl AlertManagerTrait for Alerts { alert_id: Ulid, new_state: AlertState, trigger_notif: Option, + ) -> Result<(), AlertError> { + let (mut alert, should_delete_task, should_create_task) = { + let read_access = self.alerts.read().await; + let alert = if let Some(alert) = read_access.get(&alert_id) { + match &alert.get_alert_type() { + AlertType::Threshold => Box::new(ThresholdAlert::from(alert.to_alert_config())) + as Box, + AlertType::Anomaly(_) => { + return Err(AlertError::NotPresentInOSS("anomaly")); + } + AlertType::Forecast(_) => { + return Err(AlertError::NotPresentInOSS("forecast")); + } + } + } else { + return Err(AlertError::CustomError(format!( + "No alert found for the given ID- {alert_id}" + ))); + }; + + let current_state = *alert.get_state(); + let should_delete_task = + new_state.eq(&AlertState::Disabled) && !current_state.eq(&AlertState::Disabled); + let should_create_task = + current_state.eq(&AlertState::Disabled) && new_state.eq(&AlertState::NotTriggered); + + if new_state.eq(&AlertState::Disabled) && current_state.eq(&AlertState::Disabled) { + return Err(AlertError::InvalidStateChange( + "Can't disable an alert which is currently disabled".into(), + )); + } + + (alert, should_delete_task, should_create_task) + }; // Read lock released here + + // Handle task operations without holding any locks + if should_delete_task { + self.sender + .send(AlertTask::Delete(alert_id)) + .await + .map_err(|e| AlertError::CustomError(e.to_string()))?; + } else if should_create_task { + self.sender + .send(AlertTask::Create(alert.clone_box())) + .await + .map_err(|e| AlertError::CustomError(e.to_string()))?; + } + + // Update the alert state + alert.update_state(new_state, trigger_notif).await?; + + // Finally, update the in-memory state with a brief write lock + { + let mut write_access = self.alerts.write().await; + write_access.insert(*alert.get_id(), alert.clone_box()); + } + + Ok(()) + } + + /// Update the notification state of alert + async fn update_notification_state( + &self, + alert_id: Ulid, + new_notification_state: NotificationState, ) -> Result<(), AlertError> { // let store = PARSEABLE.storage.get_object_store(); @@ -1542,11 +1254,11 @@ impl AlertManagerTrait for Alerts { AlertType::Threshold => { Box::new(ThresholdAlert::from(alert.to_alert_config())) as Box } - AlertType::Anomaly => { - return Err(AlertError::NotPresentInOSS("anomaly".into())); + AlertType::Anomaly(_) => { + return Err(AlertError::NotPresentInOSS("anomaly")); } - AlertType::Forecast => { - return Err(AlertError::NotPresentInOSS("forecast".into())); + AlertType::Forecast(_) => { + return Err(AlertError::NotPresentInOSS("forecast")); } } } else { @@ -1555,12 +1267,10 @@ impl AlertManagerTrait for Alerts { ))); }; - let current_state = alert.get_state(); - - if current_state.ne(&new_state) { - alert.update_state(false, new_state, trigger_notif).await?; - write_access.insert(*alert.get_id(), alert.clone_box()); - } + alert + .update_notification_state(new_notification_state) + .await?; + write_access.insert(*alert.get_id(), alert.clone_box()); Ok(()) } @@ -1626,27 +1336,6 @@ impl AlertManagerTrait for Alerts { } } -#[derive(Debug, Serialize)] -pub struct AlertsSummary { - total: u64, - triggered: AlertsInfoByState, - silenced: AlertsInfoByState, - resolved: AlertsInfoByState, -} - -#[derive(Debug, Serialize)] -pub struct AlertsInfoByState { - total: u64, - alert_info: Vec, -} - -#[derive(Debug, Serialize)] -pub struct AlertsInfo { - title: String, - id: Ulid, - severity: Severity, -} - // TODO: add RBAC pub async fn get_alerts_summary() -> Result { let guard = ALERTS.read().await; @@ -1659,11 +1348,11 @@ pub async fn get_alerts_summary() -> Result { let total = alerts.len() as u64; let mut triggered = 0; - let mut resolved = 0; - let mut silenced = 0; + let mut not_triggered = 0; + let mut disabled = 0; let mut triggered_alerts: Vec = Vec::new(); - let mut silenced_alerts: Vec = Vec::new(); - let mut resolved_alerts: Vec = Vec::new(); + let mut disabled_alerts: Vec = Vec::new(); + let mut not_triggered_alerts: Vec = Vec::new(); // find total alerts for each state // get title, id and state of each alert for that state @@ -1674,23 +1363,23 @@ pub async fn get_alerts_summary() -> Result { triggered_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: *alert.get_severity(), + severity: alert.get_severity().clone(), }); } - AlertState::Silenced => { - silenced += 1; - silenced_alerts.push(AlertsInfo { + AlertState::Disabled => { + disabled += 1; + disabled_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: *alert.get_severity(), + severity: alert.get_severity().clone(), }); } - AlertState::Resolved => { - resolved += 1; - resolved_alerts.push(AlertsInfo { + AlertState::NotTriggered => { + not_triggered += 1; + not_triggered_alerts.push(AlertsInfo { title: alert.get_title().to_string(), id: *alert.get_id(), - severity: *alert.get_severity(), + severity: alert.get_severity().clone(), }); } } @@ -1700,11 +1389,11 @@ pub async fn get_alerts_summary() -> Result { triggered_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity)); triggered_alerts.truncate(5); - silenced_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity)); - silenced_alerts.truncate(5); + disabled_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity)); + disabled_alerts.truncate(5); - resolved_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity)); - resolved_alerts.truncate(5); + not_triggered_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity)); + not_triggered_alerts.truncate(5); let alert_summary = AlertsSummary { total, @@ -1712,13 +1401,13 @@ pub async fn get_alerts_summary() -> Result { total: triggered, alert_info: triggered_alerts, }, - silenced: AlertsInfoByState { - total: silenced, - alert_info: silenced_alerts, + disabled: AlertsInfoByState { + total: disabled, + alert_info: disabled_alerts, }, - resolved: AlertsInfoByState { - total: resolved, - alert_info: resolved_alerts, + not_triggered: AlertsInfoByState { + total: not_triggered, + alert_info: not_triggered_alerts, }, }; Ok(alert_summary) diff --git a/src/alerts/target.rs b/src/alerts/target.rs index 12b7ded54..7e72acd4e 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -36,12 +36,14 @@ use tracing::{error, trace, warn}; use ulid::Ulid; use url::Url; -use crate::{alerts::AlertError, parseable::PARSEABLE, storage::object_storage::target_json_path}; +use crate::{ + alerts::{AlertError, AlertState, Context, alert_traits::CallableTarget}, + parseable::PARSEABLE, + storage::object_storage::target_json_path, +}; use super::ALERTS; -use super::{AlertState, CallableTarget, Context}; - pub static TARGETS: Lazy = Lazy::new(|| TargetConfigs { target_configs: RwLock::new(HashMap::new()), }); @@ -147,7 +149,6 @@ pub struct Target { pub name: String, #[serde(flatten)] pub target: TargetType, - pub notification_config: Timeout, #[serde(default = "Ulid::new")] pub id: Ulid, } @@ -166,7 +167,6 @@ impl Target { "name":self.name, "type":"slack", "endpoint":masked_endpoint, - "notificationConfig":self.notification_config, "id":self.id }) } @@ -183,7 +183,6 @@ impl Target { "endpoint":masked_endpoint, "headers":other_web_hook.headers, "skipTlsCheck":other_web_hook.skip_tls_check, - "notificationConfig":self.notification_config, "id":self.id }) } @@ -203,7 +202,6 @@ impl Target { "username":auth.username, "password":password, "skipTlsCheck":alert_manager.skip_tls_check, - "notificationConfig":self.notification_config, "id":self.id }) } else { @@ -214,7 +212,6 @@ impl Target { "username":Value::Null, "password":Value::Null, "skipTlsCheck":alert_manager.skip_tls_check, - "notificationConfig":self.notification_config, "id":self.id }) } @@ -224,7 +221,7 @@ impl Target { pub fn call(&self, context: Context) { trace!("target.call context- {context:?}"); - let timeout = &self.notification_config; + let timeout = context.notification_config.clone(); let resolves = context.alert_info.alert_state; let mut state = timeout.state.lock().unwrap(); trace!("target.call state- {state:?}"); @@ -236,15 +233,14 @@ impl Target { // call once and then start sleeping // reduce repeats by 1 call_target(self.target.clone(), context.clone()); - trace!("state not timed out- {state:?}"); // set state state.timed_out = true; state.awaiting_resolve = true; drop(state); - self.spawn_timeout_task(timeout, context.clone()); + self.spawn_timeout_task(&timeout, context.clone()); } } - alert_state @ (AlertState::Resolved | AlertState::Silenced) => { + alert_state @ AlertState::NotTriggered => { state.alert_state = alert_state; if state.timed_out { // if in timeout and resolve came in, only process if it's the first one ( awaiting resolve ) @@ -258,10 +254,13 @@ impl Target { call_target(self.target.clone(), context); } + // do not send out any notifs + // (an eval should not have run!) + AlertState::Disabled => {} } } - fn spawn_timeout_task(&self, target_timeout: &Timeout, alert_context: Context) { + fn spawn_timeout_task(&self, target_timeout: &NotificationConfig, alert_context: Context) { trace!("repeat-\n{target_timeout:?}"); let state = Arc::clone(&target_timeout.state); let retry = target_timeout.times; @@ -270,33 +269,34 @@ impl Target { let alert_id = alert_context.alert_info.alert_id; let sleep_and_check_if_call = - move |timeout_state: Arc>, current_state: AlertState| { - async move { - tokio::time::sleep(Duration::from_secs(timeout * 60)).await; + move |timeout_state: Arc>, current_state: AlertState| async move { + tokio::time::sleep(Duration::from_secs(timeout * 60)).await; - let mut state = timeout_state.lock().unwrap(); + let mut state = timeout_state.lock().unwrap(); - if current_state == AlertState::Triggered { - // it is still firing .. sleep more and come back - state.awaiting_resolve = true; - true - } else { - state.timed_out = false; - false - } + if current_state == AlertState::Triggered { + state.awaiting_resolve = true; + true + } else { + state.timed_out = false; + false } }; trace!("Spawning retry task"); tokio::spawn(async move { - let guard = ALERTS.read().await; - let alerts = if let Some(alerts) = guard.as_ref() { - alerts - } else { - error!("No AlertManager set for alert_id: {alert_id}, stopping timeout task"); - *state.lock().unwrap() = TimeoutState::default(); - return; - }; + // Get alerts manager reference once at the start + let alerts = { + let guard = ALERTS.read().await; + if let Some(alerts) = guard.as_ref() { + alerts.clone() + } else { + error!("No AlertManager set for alert_id: {alert_id}, stopping timeout task"); + *state.lock().unwrap() = TimeoutState::default(); + return; + } + }; // Lock released immediately + match retry { Retry::Infinite => loop { let current_state = if let Ok(state) = alerts.get_state(alert_id).await { @@ -333,15 +333,6 @@ impl Target { call_target(target.clone(), alert_context.clone()) } } - // // fallback for if this task only observed FIRING on all RETRIES - // // Stream might be dead and sending too many alerts is not great - // // Send and alert stating that this alert will only work once it has seen a RESOLVE - // state.lock().unwrap().timed_out = false; - // let context = alert_context; - // // context.alert_info.message = format!( - // // "Triggering alert did not resolve itself after {times} retries, This alert is paused until it resolves"); - // // Send and exit this task. - // call_target(target, context); } } *state.lock().unwrap() = TimeoutState::default(); @@ -376,7 +367,7 @@ impl TryFrom for Target { type Error = String; fn try_from(value: TargetVerifier) -> Result { - let mut timeout = Timeout::default(); + let mut timeout = NotificationConfig::default(); // Default is Infinite in case of alertmanager if matches!(value.target, TargetType::AlertManager(_)) { @@ -398,7 +389,6 @@ impl TryFrom for Target { Ok(Target { name: value.name, target: value.target, - notification_config: timeout, id: value.id, }) } @@ -447,11 +437,11 @@ impl CallableTarget for SlackWebHook { AlertState::Triggered => { serde_json::json!({ "text": payload.default_alert_string() }) } - AlertState::Resolved => { + AlertState::NotTriggered => { serde_json::json!({ "text": payload.default_resolved_string() }) } - AlertState::Silenced => { - serde_json::json!({ "text": payload.default_silenced_string() }) + AlertState::Disabled => { + serde_json::json!({ "text": payload.default_disabled_string() }) } }; @@ -485,8 +475,8 @@ impl CallableTarget for OtherWebHook { let alert = match payload.alert_info.alert_state { AlertState::Triggered => payload.default_alert_string(), - AlertState::Resolved => payload.default_resolved_string(), - AlertState::Silenced => payload.default_silenced_string(), + AlertState::NotTriggered => payload.default_resolved_string(), + AlertState::Disabled => payload.default_disabled_string(), }; let request = client @@ -548,36 +538,18 @@ impl CallableTarget for AlertManager { let alert = &mut alerts[0]; - // alert["labels"].as_object_mut().expect("is object").extend( - // payload - // .additional_labels - // .as_object() - // .expect("is object") - // .iter() - // // filter non null values for alertmanager and only pass strings - // .filter(|(_, value)| !value.is_null()) - // .map(|(k, value)| (k.to_owned(), json::convert_to_string(value))), - // ); - // fill in status label accordingly match payload.alert_info.alert_state { AlertState::Triggered => alert["labels"]["status"] = "triggered".into(), - AlertState::Resolved => { - alert["labels"]["status"] = "resolved".into(); + AlertState::NotTriggered => { + alert["labels"]["status"] = "not-triggered".into(); alert["annotations"]["reason"] = serde_json::Value::String(payload.default_resolved_string()); alert["endsAt"] = Utc::now() .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) .into(); } - AlertState::Silenced => { - alert["labels"]["status"] = "silenced".into(); - alert["annotations"]["reason"] = - serde_json::Value::String(payload.default_silenced_string()); - // alert["endsAt"] = Utc::now() - // .to_rfc3339_opts(chrono::SecondsFormat::Millis, true) - // .into(); - } + AlertState::Disabled => alert["labels"]["status"] = "disabled".into(), }; if let Err(e) = client @@ -592,15 +564,15 @@ impl CallableTarget for AlertManager { } #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] -pub struct Timeout { +pub struct NotificationConfig { pub interval: u64, - #[serde(default = "Retry::default")] + #[serde(skip)] pub times: Retry, #[serde(skip)] pub state: Arc>, } -impl Default for Timeout { +impl Default for NotificationConfig { fn default() -> Self { Self { interval: 1, diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index c86b2041c..a63cfe32f 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -19,7 +19,14 @@ use std::{collections::HashMap, str::FromStr}; use crate::{ - alerts::{AlertType, alert_types::ThresholdAlert, traits::AlertTrait}, + alerts::{ + ALERTS, AlertError, AlertState, Severity, + alert_enums::{AlertType, NotificationState}, + alert_structs::{AlertConfig, AlertRequest, NotificationStateRequest}, + alert_traits::AlertTrait, + alert_types::ThresholdAlert, + target::Retry, + }, parseable::PARSEABLE, storage::object_storage::alert_json_path, utils::{actix::extract_session_key_from_req, user_auth_for_query}, @@ -29,17 +36,12 @@ use actix_web::{ web::{self, Json, Path}, }; use bytes::Bytes; +use chrono::{DateTime, Utc}; use ulid::Ulid; -use crate::alerts::{ALERTS, AlertConfig, AlertError, AlertRequest, AlertState, Severity}; - // GET /alerts /// User needs at least a read access to the stream(s) that is being referenced in an alert /// Read all alerts then return alerts which satisfy the condition -/// Supports pagination with optional query parameters: -/// - tags: comma-separated list of tags to filter alerts -/// - offset: number of alerts to skip (default: 0) -/// - limit: maximum number of alerts to return (default: 100, max: 1000) pub async fn list(req: HttpRequest) -> Result { let session_key = extract_session_key_from_req(&req)?; let query_map = web::Query::>::from_query(req.query_string()) @@ -87,7 +89,6 @@ pub async fn list(req: HttpRequest) -> Result { } } } - let guard = ALERTS.read().await; let alerts = if let Some(alerts) = guard.as_ref() { alerts @@ -100,21 +101,20 @@ pub async fn list(req: HttpRequest) -> Result { .iter() .map(|alert| alert.to_summary()) .collect::>(); - - // Sort by state priority (Triggered > Silenced > Resolved) then by severity (Critical > High > Medium > Low) + // Sort by state priority (Triggered > NotTriggered) then by severity (Critical > High > Medium > Low) alerts_summary.sort_by(|a, b| { // Parse state and severity from JSON values back to enums let state_a = a .get("state") .and_then(|v| v.as_str()) .and_then(|s| s.parse::().ok()) - .unwrap_or(AlertState::Resolved); // Default to lowest priority + .unwrap_or(AlertState::NotTriggered); // Default to lowest priority let state_b = b .get("state") .and_then(|v| v.as_str()) .and_then(|s| s.parse::().ok()) - .unwrap_or(AlertState::Resolved); + .unwrap_or(AlertState::NotTriggered); let severity_a = a .get("severity") @@ -128,10 +128,15 @@ pub async fn list(req: HttpRequest) -> Result { .and_then(|s| s.parse::().ok()) .unwrap_or(Severity::Low); + let title_a = a.get("title").and_then(|v| v.as_str()).unwrap_or(""); + + let title_b = b.get("title").and_then(|v| v.as_str()).unwrap_or(""); + // First sort by state, then by severity state_a .cmp(&state_b) .then_with(|| severity_a.cmp(&severity_b)) + .then_with(|| title_a.cmp(title_b)) }); let paginated_alerts = alerts_summary @@ -148,7 +153,35 @@ pub async fn post( req: HttpRequest, Json(alert): Json, ) -> Result { - let alert: AlertConfig = alert.into().await?; + let mut alert: AlertConfig = alert.into().await?; + + if alert.notification_config.interval > alert.get_eval_frequency() { + return Err(AlertError::ValidationFailure( + "Notification interval cannot exceed evaluation frequency".into(), + )); + } + + if alert.get_eval_frequency().eq(&0) { + return Err(AlertError::ValidationFailure( + "Eval frequency cannot be 0".into(), + )); + } + if alert.notification_config.interval.eq(&0) { + return Err(AlertError::ValidationFailure( + "Notification interval cannot be 0".into(), + )); + } + + // calculate the `times` for notification config + let eval_freq = alert.get_eval_frequency(); + let notif_freq = alert.notification_config.interval; + let times = if (eval_freq / notif_freq) == 0 { + 1 + } else { + (eval_freq / notif_freq) as usize + }; + + alert.notification_config.times = Retry::Finite(times); let threshold_alert; let alert: &dyn AlertTrait = match &alert.alert_type { @@ -156,11 +189,11 @@ pub async fn post( threshold_alert = ThresholdAlert::from(alert); &threshold_alert } - AlertType::Anomaly => { - return Err(AlertError::NotPresentInOSS("anomaly".into())); + AlertType::Anomaly(_) => { + return Err(AlertError::NotPresentInOSS("anomaly")); } - AlertType::Forecast => { - return Err(AlertError::NotPresentInOSS("forecast".into())); + AlertType::Forecast(_) => { + return Err(AlertError::NotPresentInOSS("forecast")); } }; @@ -190,7 +223,7 @@ pub async fn post( // start the task alerts.start_task(alert.clone_box()).await?; - Ok(web::Json(alert.to_alert_config())) + Ok(web::Json(alert.to_alert_config().to_response())) } // GET /alerts/{alert_id} @@ -209,7 +242,7 @@ pub async fn get(req: HttpRequest, alert_id: Path) -> Result) -> Result, + Json(new_notification_state): Json, ) -> Result { let session_key = extract_session_key_from_req(&req)?; let alert_id = alert_id.into_inner(); + let new_notification_state = match new_notification_state.state.as_str() { + "notify" => NotificationState::Notify, + "indefinite" => NotificationState::Mute("indefinite".into()), + _ => { + // either human time or datetime in UTC + let till_time = if let Ok(duration) = + humantime::parse_duration(&new_notification_state.state) + { + (Utc::now() + duration).to_rfc3339() + } else if let Ok(timestamp) = DateTime::::from_str(&new_notification_state.state) { + // must be datetime utc then + timestamp.to_rfc3339() + } else { + return Err(AlertError::InvalidStateChange(format!( + "Invalid notification state change request. Expected `notify` or human-time or UTC datetime. Got `{}`", + &new_notification_state.state + ))); + }; + NotificationState::Mute(till_time) + } + }; + let guard = ALERTS.write().await; let alerts = if let Some(alerts) = guard.as_ref() { alerts @@ -266,33 +322,196 @@ pub async fn update_state( }; // check if alert id exists in map - let mut alert = alerts.get_alert_by_id(alert_id).await?; + let alert = alerts.get_alert_by_id(alert_id).await?; // validate that the user has access to the tables mentioned in the query user_auth_for_query(&session_key, alert.get_query()).await?; - let query_string = req.query_string(); + alerts + .update_notification_state(alert_id, new_notification_state) + .await?; + let alert = alerts.get_alert_by_id(alert_id).await?; + + Ok(web::Json(alert.to_alert_config().to_response())) +} + +// PATCH /alerts/{alert_id}/disable +/// first save on disk, then in memory +/// then modify scheduled task +pub async fn disable_alert( + req: HttpRequest, + alert_id: Path, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + let guard = ALERTS.write().await; + let alerts = if let Some(alerts) = guard.as_ref() { + alerts + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + }; + + // check if alert id exists in map + let alert = alerts.get_alert_by_id(alert_id).await?; + // validate that the user has access to the tables mentioned in the query + user_auth_for_query(&session_key, alert.get_query()).await?; + + alerts + .update_state(alert_id, AlertState::Disabled, Some("".into())) + .await?; + let alert = alerts.get_alert_by_id(alert_id).await?; + + Ok(web::Json(alert.to_alert_config().to_response())) +} + +// PATCH /alerts/{alert_id}/enable +/// first save on disk, then in memory +/// then modify scheduled task +pub async fn enable_alert( + req: HttpRequest, + alert_id: Path, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + let guard = ALERTS.write().await; + let alerts = if let Some(alerts) = guard.as_ref() { + alerts + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + }; + + // check if alert id exists in map + let alert = alerts.get_alert_by_id(alert_id).await?; - if query_string.is_empty() { + // only run if alert is disabled + if alert.get_state().ne(&AlertState::Disabled) { return Err(AlertError::InvalidStateChange( - "No query string provided".to_string(), + "Can't enable an alert which is not currently disabled".into(), )); } - let tokens = query_string.split('=').collect::>(); - let state_key = tokens[0]; - let state_value = tokens[1]; - if state_key != "state" { - return Err(AlertError::InvalidStateChange( - "Invalid query parameter".to_string(), - )); + // validate that the user has access to the tables mentioned in the query + user_auth_for_query(&session_key, alert.get_query()).await?; + + alerts + .update_state(alert_id, AlertState::NotTriggered, Some("".into())) + .await?; + let alert = alerts.get_alert_by_id(alert_id).await?; + + Ok(web::Json(alert.to_alert_config().to_response())) +} + +// PUT /alerts/{alert_id} +/// first save on disk, then in memory +/// then modify scheduled task +pub async fn modify_alert( + req: HttpRequest, + alert_id: Path, + Json(alert_request): Json, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + // Get alerts manager reference without holding the global lock + let alerts = { + let guard = ALERTS.read().await; + if let Some(alerts) = guard.as_ref() { + alerts.clone() + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + } + }; + + // Validate and prepare the new alert + let alert = alerts.get_alert_by_id(alert_id).await?; + user_auth_for_query(&session_key, alert.get_query()).await?; + + let mut new_config = alert_request.into().await?; + if &new_config.alert_type != alert.get_alert_type() { + return Err(AlertError::InvalidAlertModifyRequest); } - let new_state = AlertState::from_str(state_value)?; - alert.update_state(true, new_state, Some("".into())).await?; + user_auth_for_query(&session_key, &new_config.query).await?; + + // Calculate notification config + let eval_freq = new_config.get_eval_frequency(); + let notif_freq = new_config.notification_config.interval; + let times = if (eval_freq / notif_freq) == 0 { + 1 + } else { + (eval_freq / notif_freq) as usize + }; + new_config.notification_config.times = Retry::Finite(times); + + // Prepare the updated config + let mut old_config = alert.to_alert_config(); + old_config.threshold_config = new_config.threshold_config; + old_config.datasets = new_config.datasets; + old_config.eval_config = new_config.eval_config; + old_config.notification_config = new_config.notification_config; + old_config.query = new_config.query; + old_config.severity = new_config.severity; + old_config.tags = new_config.tags; + old_config.targets = new_config.targets; + old_config.title = new_config.title; + + let new_alert: Box = match &new_config.alert_type { + AlertType::Threshold => Box::new(ThresholdAlert::from(old_config)) as Box, + AlertType::Anomaly(_) => { + return Err(AlertError::NotPresentInOSS("anomaly")); + } + AlertType::Forecast(_) => { + return Err(AlertError::NotPresentInOSS("forecast")); + } + }; + + new_alert.validate(&session_key).await?; + + // Perform I/O operations + let path = alert_json_path(*new_alert.get_id()); + let store = PARSEABLE.storage.get_object_store(); + let alert_bytes = serde_json::to_vec(&new_alert.to_alert_config())?; + store.put_object(&path, Bytes::from(alert_bytes)).await?; + + // Now perform the atomic operations + alerts.delete_task(alert_id).await?; + alerts.delete(alert_id).await?; + alerts.update(&*new_alert).await; + alerts.start_task(new_alert.clone_box()).await?; + + let config = new_alert.to_alert_config().to_response(); + Ok(web::Json(config)) +} + +// PUT /alerts/{alert_id}/evaluate_alert +pub async fn evaluate_alert( + req: HttpRequest, + alert_id: Path, +) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let alert_id = alert_id.into_inner(); + + let guard = ALERTS.write().await; + let alerts = if let Some(alerts) = guard.as_ref() { + alerts + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + }; + + let alert = alerts.get_alert_by_id(alert_id).await?; + + user_auth_for_query(&session_key, alert.get_query()).await?; + + let config = alert.to_alert_config().to_response(); + + // remove task + alerts.delete_task(alert_id).await?; - alerts.update(&*alert).await; + // add the task back again so that it evaluates right now + alerts.start_task(alert).await?; - Ok(web::Json(alert.to_alert_config())) + Ok(Json(config)) } pub async fn list_tags() -> Result { diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 3e8170e41..b629bbab1 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -267,7 +267,7 @@ impl Server { .route(web::get().to(alerts::get).authorize(Action::GetAlert)) .route( web::put() - .to(alerts::update_state) + .to(alerts::modify_alert) .authorize(Action::PutAlert), ) .route( @@ -276,6 +276,34 @@ impl Server { .authorize(Action::DeleteAlert), ), ) + .service( + web::resource("/{alert_id}/disable").route( + web::patch() + .to(alerts::disable_alert) + .authorize(Action::PutAlert), + ), + ) + .service( + web::resource("/{alert_id}/enable").route( + web::patch() + .to(alerts::enable_alert) + .authorize(Action::PutAlert), + ), + ) + .service( + web::resource("/{alert_id}/update_notification_state").route( + web::patch() + .to(alerts::update_notification_state) + .authorize(Action::PutAlert), + ), + ) + .service( + web::resource("/{alert_id}/evaluate_alert").route( + web::put() + .to(alerts::evaluate_alert) + .authorize(Action::PutAlert), + ), + ) } pub fn get_targets_webscope() -> Scope { diff --git a/src/handlers/http/targets.rs b/src/handlers/http/targets.rs index 41cad80c7..3c5958a56 100644 --- a/src/handlers/http/targets.rs +++ b/src/handlers/http/targets.rs @@ -19,7 +19,8 @@ pub async fn post( // add to the map TARGETS.update(target.clone()).await?; - Ok(web::Json(target.mask())) + // Ok(web::Json(target.mask())) + Ok(web::Json(target)) } // GET /targets @@ -29,7 +30,7 @@ pub async fn list(_req: HttpRequest) -> Result { .list() .await? .into_iter() - .map(|t| t.mask()) + // .map(|t| t.mask()) .collect_vec(); Ok(web::Json(list)) @@ -41,7 +42,8 @@ pub async fn get(_req: HttpRequest, target_id: Path) -> Result JsonValue { for privilege in privileges.iter_mut() { let privilege_value = privilege.get_mut("privilege"); if let Some(value) = privilege_value - && value.as_str().unwrap() == "ingester" + && matches!(value.as_str(), Some("ingester")) { *value = JsonValue::String("ingestor".to_string()); } @@ -125,7 +125,7 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { for privilege in privileges.iter_mut() { let privilege_value = privilege.get_mut("privilege"); if let Some(value) = privilege_value - && value.as_str().unwrap() == "ingester" + && matches!(value.as_str(), Some("ingester")) { *value = JsonValue::String("ingestor".to_string()); } diff --git a/src/prism/home/mod.rs b/src/prism/home/mod.rs index 2d9801d56..87bac25d6 100644 --- a/src/prism/home/mod.rs +++ b/src/prism/home/mod.rs @@ -27,7 +27,7 @@ use serde::Serialize; use tracing::error; use crate::{ - alerts::{ALERTS, AlertError, AlertsSummary, get_alerts_summary}, + alerts::{ALERTS, AlertError, alert_structs::AlertsSummary, get_alerts_summary}, correlation::{CORRELATIONS, CorrelationError}, handlers::{ TelemetryType, diff --git a/src/query/mod.rs b/src/query/mod.rs index 16ec3571e..dea173db2 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -50,7 +50,7 @@ use tokio::runtime::Runtime; use self::error::ExecuteError; use self::stream_schema_provider::GlobalSchemaProvider; pub use self::stream_schema_provider::PartialTimeFilter; -use crate::alerts::Conditions; +use crate::alerts::alert_structs::Conditions; use crate::alerts::alerts_utils::get_filter_string; use crate::catalog::Snapshot as CatalogSnapshot; use crate::catalog::column::{Int64Type, TypedStatistics}; diff --git a/src/sync.rs b/src/sync.rs index bfc6ba88d..aaea1d2dc 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -27,7 +27,8 @@ use tokio::time::{Duration, Instant, interval_at, sleep}; use tokio::{select, task}; use tracing::{error, info, trace, warn}; -use crate::alerts::{AlertTask, alerts_utils}; +use crate::alerts::alert_enums::AlertTask; +use crate::alerts::alerts_utils; use crate::parseable::PARSEABLE; use crate::storage::object_storage::sync_all_streams; use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; @@ -339,9 +340,12 @@ pub async fn alert_runtime(mut rx: mpsc::Receiver) -> Result<(), anyh if let Some(handle) = alert_tasks.remove(&ulid) { // cancel the task handle.abort(); - warn!("Alert with id {} deleted", ulid); + trace!("Alert with id {} deleted from evaluation tasks list", ulid); } else { - error!("Alert with id {} does not exist", ulid); + error!( + "Alert with id {} does not exist in evaluation tasks list", + ulid + ); } } }