Skip to content

Commit 856be6a

Browse files
committed
changes for alerts
1 parent a68bd8d commit 856be6a

File tree

2 files changed

+251
-139
lines changed

2 files changed

+251
-139
lines changed

src/alerts/alerts_utils.rs

Lines changed: 89 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,17 @@
1818

1919
use arrow_array::{Float64Array, Int64Array, RecordBatch};
2020
use datafusion::{
21-
common::tree_node::TreeNode,
22-
functions_aggregate::{
21+
common::tree_node::TreeNode, functions_aggregate::{
2322
count::count,
2423
expr_fn::avg,
2524
min_max::{max, min},
2625
sum::sum,
27-
},
28-
prelude::{col, lit, DataFrame, Expr},
26+
}, logical_expr::{BinaryExpr, Operator}, prelude::{col, lit, DataFrame, Expr}
2927
};
3028
use tracing::trace;
3129

3230
use crate::{
33-
alerts::AggregateCondition,
31+
alerts::LogicalOperator,
3432
parseable::PARSEABLE,
3533
query::{TableScanVisitor, QUERY_SESSION},
3634
rbac::{
@@ -42,8 +40,7 @@ use crate::{
4240
};
4341

4442
use super::{
45-
AggregateConfig, AggregateOperation, AggregateResult, Aggregations, AlertConfig, AlertError,
46-
AlertOperator, AlertState, ConditionConfig, Conditions, ALERTS,
43+
AggregateConfig, AggregateFunction, AggregateResult, Aggregates, AlertConfig, AlertError, AlertOperator, AlertState, ConditionConfig, Conditions, WhereConfigOperator, ALERTS
4744
};
4845

4946
async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, AlertError> {
@@ -104,18 +101,16 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
104101
let query = prepare_query(alert).await?;
105102
let select_query = format!("SELECT * FROM {}", alert.stream);
106103
let base_df = execute_base_query(&query, &select_query).await?;
107-
let agg_results = evaluate_aggregates(&alert.aggregate_config, &base_df).await?;
108-
let final_res = calculate_final_result(&alert.aggregate_config, &agg_results);
104+
let agg_results = evaluate_aggregates(&alert.aggregates, &base_df).await?;
105+
let final_res = calculate_final_result(&alert.aggregates, &agg_results);
109106

110107
update_alert_state(alert, final_res, &agg_results).await?;
111108
Ok(())
112109
}
113110

114111
async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, AlertError> {
115-
let (start_time, end_time) = match &alert.eval_type {
116-
super::EvalConfig::RollingWindow(rolling_window) => {
117-
(&rolling_window.eval_start, &rolling_window.eval_end)
118-
}
112+
let (start_time, end_time) = match &alert.eval_config {
113+
super::EvalConfig::RollingWindow(rolling_window) => (&rolling_window.eval_start, "now"),
119114
};
120115

121116
let session_state = QUERY_SESSION.state();
@@ -148,15 +143,15 @@ async fn execute_base_query(
148143
}
149144

150145
async fn evaluate_aggregates(
151-
agg_config: &Aggregations,
146+
agg_config: &Aggregates,
152147
base_df: &DataFrame,
153148
) -> Result<Vec<AggregateResult>, AlertError> {
154149
let agg_filter_exprs = get_exprs(agg_config);
155150
let mut results = Vec::new();
156151

157152
let conditions = match &agg_config.operator {
158-
Some(_) => &agg_config.aggregate_conditions[0..2],
159-
None => &agg_config.aggregate_conditions[0..1],
153+
Some(_) => &agg_config.aggregate_config[0..2],
154+
None => &agg_config.aggregate_config[0..1],
160155
};
161156

162157
for ((agg_expr, filter), agg) in agg_filter_exprs.into_iter().zip(conditions) {
@@ -188,7 +183,7 @@ async fn evaluate_single_aggregate(
188183
let result = evaluate_condition(&agg.operator, final_value, agg.value);
189184

190185
let message = if result {
191-
agg.condition_config
186+
agg.conditions
192187
.as_ref()
193188
.map(|config| config.generate_filter_message())
194189
.or(Some(String::default()))
@@ -208,18 +203,18 @@ fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> b
208203
match operator {
209204
AlertOperator::GreaterThan => actual > expected,
210205
AlertOperator::LessThan => actual < expected,
211-
AlertOperator::EqualTo => actual == expected,
212-
AlertOperator::NotEqualTo => actual != expected,
213-
AlertOperator::GreaterThanEqualTo => actual >= expected,
214-
AlertOperator::LessThanEqualTo => actual <= expected,
206+
AlertOperator::Equal => actual == expected,
207+
AlertOperator::NotEqual => actual != expected,
208+
AlertOperator::GreaterThanOrEqual => actual >= expected,
209+
AlertOperator::LessThanOrEqual => actual <= expected,
215210
_ => unreachable!(),
216211
}
217212
}
218213

219-
fn calculate_final_result(agg_config: &Aggregations, results: &[AggregateResult]) -> bool {
214+
fn calculate_final_result(agg_config: &Aggregates, results: &[AggregateResult]) -> bool {
220215
match &agg_config.operator {
221-
Some(AggregateCondition::And) => results.iter().all(|r| r.result),
222-
Some(AggregateCondition::Or) => results.iter().any(|r| r.result),
216+
Some(LogicalOperator::And) => results.iter().all(|r| r.result),
217+
Some(LogicalOperator::Or) => results.iter().any(|r| r.result),
223218
None => results.first().is_some_and(|r| r.result),
224219
}
225220
}
@@ -252,7 +247,7 @@ fn format_alert_message(agg_results: &[AggregateResult]) -> String {
252247
if let Some(msg) = &result.message {
253248
message.extend([format!(
254249
"|{}({}) WHERE ({}) {} {} (ActualValue: {})|",
255-
result.config.agg,
250+
result.config.aggregate_function,
256251
result.config.column,
257252
msg,
258253
result.config.operator,
@@ -262,7 +257,7 @@ fn format_alert_message(agg_results: &[AggregateResult]) -> String {
262257
} else {
263258
message.extend([format!(
264259
"|{}({}) {} {} (ActualValue: {})",
265-
result.config.agg,
260+
result.config.aggregate_function,
266261
result.config.column,
267262
result.config.operator,
268263
result.config.value,
@@ -307,17 +302,17 @@ fn get_final_value(aggregated_rows: Vec<RecordBatch>) -> f64 {
307302
/// returns a tuple of (aggregate expressions, filter expressions)
308303
///
309304
/// It calls get_filter_expr() to get filter expressions
310-
fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option<Expr>)> {
305+
fn get_exprs(aggregate_config: &Aggregates) -> Vec<(Expr, Option<Expr>)> {
311306
let mut agg_expr = Vec::new();
312307

313308
match &aggregate_config.operator {
314309
Some(op) => match op {
315-
AggregateCondition::And | AggregateCondition::Or => {
316-
let agg1 = &aggregate_config.aggregate_conditions[0];
317-
let agg2 = &aggregate_config.aggregate_conditions[1];
310+
LogicalOperator::And | LogicalOperator::Or => {
311+
let agg1 = &aggregate_config.aggregate_config[0];
312+
let agg2 = &aggregate_config.aggregate_config[1];
318313

319314
for agg in [agg1, agg2] {
320-
let filter_expr = if let Some(where_clause) = &agg.condition_config {
315+
let filter_expr = if let Some(where_clause) = &agg.conditions {
321316
let fe = get_filter_expr(where_clause);
322317

323318
trace!("filter_expr-\n{fe:?}");
@@ -333,9 +328,9 @@ fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option<Expr>)> {
333328
}
334329
},
335330
None => {
336-
let agg = &aggregate_config.aggregate_conditions[0];
331+
let agg = &aggregate_config.aggregate_config[0];
337332

338-
let filter_expr = if let Some(where_clause) = &agg.condition_config {
333+
let filter_expr = if let Some(where_clause) = &agg.conditions {
339334
let fe = get_filter_expr(where_clause);
340335

341336
trace!("filter_expr-\n{fe:?}");
@@ -355,23 +350,23 @@ fn get_exprs(aggregate_config: &Aggregations) -> Vec<(Expr, Option<Expr>)> {
355350
fn get_filter_expr(where_clause: &Conditions) -> Expr {
356351
match &where_clause.operator {
357352
Some(op) => match op {
358-
AggregateCondition::And => {
353+
LogicalOperator::And => {
359354
let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
360355

361-
let expr1 = &where_clause.conditions[0];
362-
let expr2 = &where_clause.conditions[1];
356+
let expr1 = &where_clause.condition_config[0];
357+
let expr2 = &where_clause.condition_config[1];
363358

364359
for e in [expr1, expr2] {
365360
let ex = match_alert_operator(e);
366361
expr = expr.and(ex);
367362
}
368363
expr
369364
}
370-
AggregateCondition::Or => {
365+
LogicalOperator::Or => {
371366
let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(false)));
372367

373-
let expr1 = &where_clause.conditions[0];
374-
let expr2 = &where_clause.conditions[1];
368+
let expr1 = &where_clause.condition_config[0];
369+
let expr2 = &where_clause.condition_config[1];
375370

376371
for e in [expr1, expr2] {
377372
let ex = match_alert_operator(e);
@@ -381,30 +376,70 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr {
381376
}
382377
},
383378
None => {
384-
let expr = &where_clause.conditions[0];
379+
let expr = &where_clause.condition_config[0];
385380
match_alert_operator(expr)
386381
}
387382
}
388383
}
389384

390385
fn match_alert_operator(expr: &ConditionConfig) -> Expr {
391386
match expr.operator {
392-
AlertOperator::GreaterThan => col(&expr.column).gt(lit(&expr.value)),
393-
AlertOperator::LessThan => col(&expr.column).lt(lit(&expr.value)),
394-
AlertOperator::EqualTo => col(&expr.column).eq(lit(&expr.value)),
395-
AlertOperator::NotEqualTo => col(&expr.column).not_eq(lit(&expr.value)),
396-
AlertOperator::GreaterThanEqualTo => col(&expr.column).gt_eq(lit(&expr.value)),
397-
AlertOperator::LessThanEqualTo => col(&expr.column).lt_eq(lit(&expr.value)),
398-
AlertOperator::Like => col(&expr.column).like(lit(&expr.value)),
399-
AlertOperator::NotLike => col(&expr.column).not_like(lit(&expr.value)),
387+
WhereConfigOperator::Equal => col(&expr.column).eq(lit(&expr.value)),
388+
WhereConfigOperator::NotEqual => col(&expr.column).not_eq(lit(&expr.value)),
389+
WhereConfigOperator::LessThan => col(&expr.column).lt(lit(&expr.value)),
390+
WhereConfigOperator::GreaterThan => col(&expr.column).gt(lit(&expr.value)),
391+
WhereConfigOperator::LessThanOrEqual => col(&expr.column).lt_eq(lit(&expr.value)),
392+
WhereConfigOperator::GreaterThanOrEqual => col(&expr.column).gt_eq(lit(&expr.value)),
393+
WhereConfigOperator::IsNull => col(&expr.column).is_null(),
394+
WhereConfigOperator::IsNotNull => col(&expr.column).is_not_null(),
395+
WhereConfigOperator::ILike => col(&expr.column).ilike(lit(&expr.value)),
396+
WhereConfigOperator::Contains => col(&expr.column).like(lit(&expr.value)),
397+
WhereConfigOperator::BeginsWith => {
398+
Expr::BinaryExpr(
399+
BinaryExpr::new(
400+
Box::new(col(&expr.column)),
401+
Operator::RegexIMatch,
402+
Box::new(lit(format!("^{}", expr.value)))
403+
)
404+
)
405+
},
406+
WhereConfigOperator::EndsWith => {
407+
Expr::BinaryExpr(
408+
BinaryExpr::new(
409+
Box::new(col(&expr.column)),
410+
Operator::RegexIMatch,
411+
Box::new(lit(format!("{}$", expr.value)))
412+
)
413+
)
414+
},
415+
WhereConfigOperator::DoesNotContain => col(&expr.column).not_ilike(lit(&expr.value)),
416+
WhereConfigOperator::DoesNotBeginWith => {
417+
Expr::BinaryExpr(
418+
BinaryExpr::new(
419+
Box::new(col(&expr.column)),
420+
Operator::RegexNotIMatch,
421+
Box::new(lit(format!("^{}", expr.value)))
422+
)
423+
)
424+
},
425+
WhereConfigOperator::DoesNotEndWith => {
426+
Expr::BinaryExpr(
427+
BinaryExpr::new(
428+
Box::new(col(&expr.column)),
429+
Operator::RegexNotIMatch,
430+
Box::new(lit(format!("{}$", expr.value)))
431+
)
432+
)
433+
},
400434
}
401435
}
436+
402437
fn match_aggregate_operation(agg: &AggregateConfig) -> Expr {
403-
match agg.agg {
404-
AggregateOperation::Avg => avg(col(&agg.column)),
405-
AggregateOperation::Count => count(col(&agg.column)),
406-
AggregateOperation::Min => min(col(&agg.column)),
407-
AggregateOperation::Max => max(col(&agg.column)),
408-
AggregateOperation::Sum => sum(col(&agg.column)),
438+
match agg.aggregate_function {
439+
AggregateFunction::Avg => avg(col(&agg.column)),
440+
AggregateFunction::Count => count(col(&agg.column)),
441+
AggregateFunction::Min => min(col(&agg.column)),
442+
AggregateFunction::Max => max(col(&agg.column)),
443+
AggregateFunction::Sum => sum(col(&agg.column)),
409444
}
410445
}

0 commit comments

Comments
 (0)