Skip to content

Commit ece7953

Browse files
committed
Multiple updates
- Added helper methods to fetch base query, eval details - Ensured DataFusion accepts columns and table names as is - Removed clockwerk scheduler - Modifed alert config JSON - Modified triggered alert message
1 parent 74fa378 commit ece7953

File tree

10 files changed

+317
-346
lines changed

10 files changed

+317
-346
lines changed

src/alerts/alerts_utils.rs

Lines changed: 85 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
use arrow_array::{Float64Array, Int64Array, RecordBatch};
2020
use datafusion::{
21-
common::tree_node::TreeNode, functions_aggregate::{
21+
common::tree_node::TreeNode,
22+
functions_aggregate::{
2223
count::count,
2324
expr_fn::avg,
2425
min_max::{max, min},
2526
sum::sum,
26-
}, logical_expr::{BinaryExpr, Operator}, prelude::{col, lit, DataFrame, Expr}
27+
},
28+
logical_expr::{BinaryExpr, Literal, Operator},
29+
prelude::{col, lit, DataFrame, Expr},
2730
};
2831
use tracing::trace;
2932

@@ -40,7 +43,8 @@ use crate::{
4043
};
4144

4245
use super::{
43-
AggregateConfig, AggregateFunction, AggregateResult, Aggregates, AlertConfig, AlertError, AlertOperator, AlertState, ConditionConfig, Conditions, WhereConfigOperator, ALERTS
46+
AggregateConfig, AggregateFunction, AggregateResult, Aggregates, AlertConfig, AlertError,
47+
AlertOperator, AlertState, ConditionConfig, Conditions, WhereConfigOperator, ALERTS,
4448
};
4549

4650
async fn get_tables_from_query(query: &str) -> Result<TableScanVisitor, AlertError> {
@@ -99,7 +103,7 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
99103
trace!("RUNNING EVAL TASK FOR- {alert:?}");
100104

101105
let query = prepare_query(alert).await?;
102-
let select_query = format!("SELECT * FROM {}", alert.stream);
106+
let select_query = alert.get_base_query();
103107
let base_df = execute_base_query(&query, &select_query).await?;
104108
let agg_results = evaluate_aggregates(&alert.aggregates, &base_df).await?;
105109
let final_res = calculate_final_result(&alert.aggregates, &agg_results);
@@ -114,7 +118,7 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert
114118
};
115119

116120
let session_state = QUERY_SESSION.state();
117-
let select_query = format!("SELECT * FROM {}", alert.stream);
121+
let select_query = alert.get_base_query();
118122
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;
119123

120124
let time_range = TimeRange::parse_human_time(start_time, end_time)
@@ -186,7 +190,7 @@ async fn evaluate_single_aggregate(
186190
agg.conditions
187191
.as_ref()
188192
.map(|config| config.generate_filter_message())
189-
.or(Some(String::default()))
193+
.or(None)
190194
} else {
191195
None
192196
};
@@ -207,7 +211,6 @@ fn evaluate_condition(operator: &AlertOperator, actual: f64, expected: f64) -> b
207211
AlertOperator::NotEqual => actual != expected,
208212
AlertOperator::GreaterThanOrEqual => actual >= expected,
209213
AlertOperator::LessThanOrEqual => actual <= expected,
210-
_ => unreachable!(),
211214
}
212215
}
213216

@@ -225,8 +228,12 @@ async fn update_alert_state(
225228
agg_results: &[AggregateResult],
226229
) -> Result<(), AlertError> {
227230
if final_res {
228-
trace!("ALERT!!!!!!");
229231
let message = format_alert_message(agg_results);
232+
let message = format!(
233+
"{message}\nEvaluation Window: {}\nEvaluation Frequency: {}m",
234+
alert.get_eval_window(),
235+
alert.get_eval_frequency()
236+
);
230237
ALERTS
231238
.update_state(alert.id, AlertState::Triggered, Some(message))
232239
.await
@@ -246,7 +253,7 @@ fn format_alert_message(agg_results: &[AggregateResult]) -> String {
246253
for result in agg_results {
247254
if let Some(msg) = &result.message {
248255
message.extend([format!(
249-
"|{}({}) WHERE ({}) {} {} (ActualValue: {})|",
256+
"\nCondition: {}({}) WHERE ({}) {} {}\nActualValue: {}\n",
250257
result.config.aggregate_function,
251258
result.config.column,
252259
msg,
@@ -256,7 +263,7 @@ fn format_alert_message(agg_results: &[AggregateResult]) -> String {
256263
)]);
257264
} else {
258265
message.extend([format!(
259-
"|{}({}) {} {} (ActualValue: {})",
266+
"\nCondition: {}({}) {} {}\nActualValue: {}\n",
260267
result.config.aggregate_function,
261268
result.config.column,
262269
result.config.operator,
@@ -383,63 +390,79 @@ fn get_filter_expr(where_clause: &Conditions) -> Expr {
383390
}
384391

385392
fn match_alert_operator(expr: &ConditionConfig) -> Expr {
393+
// the form accepts value as a string
394+
// if it can be parsed as a number, then parse it
395+
// else keep it as a string
396+
let value = NumberOrString::from_string(expr.value.clone());
397+
398+
// for maintaining column case
399+
let column = format!(r#""{}""#, expr.column);
386400
match expr.operator {
387-
WhereConfigOperator::Equal => col(&expr.column).eq(lit(&expr.value)),
388-
WhereConfigOperator::NotEqual => col(&expr.column).not_eq(lit(&expr.value)),
389-
WhereConfigOperator::LessThan => col(&expr.column).lt(lit(&expr.value)),
390-
WhereConfigOperator::GreaterThan => col(&expr.column).gt(lit(&expr.value)),
391-
WhereConfigOperator::LessThanOrEqual => col(&expr.column).lt_eq(lit(&expr.value)),
392-
WhereConfigOperator::GreaterThanOrEqual => col(&expr.column).gt_eq(lit(&expr.value)),
393-
WhereConfigOperator::IsNull => col(&expr.column).is_null(),
394-
WhereConfigOperator::IsNotNull => col(&expr.column).is_not_null(),
395-
WhereConfigOperator::ILike => col(&expr.column).ilike(lit(&expr.value)),
396-
WhereConfigOperator::Contains => col(&expr.column).like(lit(&expr.value)),
397-
WhereConfigOperator::BeginsWith => {
398-
Expr::BinaryExpr(
399-
BinaryExpr::new(
400-
Box::new(col(&expr.column)),
401-
Operator::RegexIMatch,
402-
Box::new(lit(format!("^{}", expr.value)))
403-
)
404-
)
405-
},
406-
WhereConfigOperator::EndsWith => {
407-
Expr::BinaryExpr(
408-
BinaryExpr::new(
409-
Box::new(col(&expr.column)),
410-
Operator::RegexIMatch,
411-
Box::new(lit(format!("{}$", expr.value)))
412-
)
413-
)
414-
},
415-
WhereConfigOperator::DoesNotContain => col(&expr.column).not_ilike(lit(&expr.value)),
416-
WhereConfigOperator::DoesNotBeginWith => {
417-
Expr::BinaryExpr(
418-
BinaryExpr::new(
419-
Box::new(col(&expr.column)),
420-
Operator::RegexNotIMatch,
421-
Box::new(lit(format!("^{}", expr.value)))
422-
)
423-
)
424-
},
425-
WhereConfigOperator::DoesNotEndWith => {
426-
Expr::BinaryExpr(
427-
BinaryExpr::new(
428-
Box::new(col(&expr.column)),
429-
Operator::RegexNotIMatch,
430-
Box::new(lit(format!("{}$", expr.value)))
431-
)
432-
)
433-
},
401+
WhereConfigOperator::Equal => col(column).eq(lit(value)),
402+
WhereConfigOperator::NotEqual => col(column).not_eq(lit(value)),
403+
WhereConfigOperator::LessThan => col(column).lt(lit(value)),
404+
WhereConfigOperator::GreaterThan => col(column).gt(lit(value)),
405+
WhereConfigOperator::LessThanOrEqual => col(column).lt_eq(lit(value)),
406+
WhereConfigOperator::GreaterThanOrEqual => col(column).gt_eq(lit(value)),
407+
WhereConfigOperator::IsNull => col(column).is_null(),
408+
WhereConfigOperator::IsNotNull => col(column).is_not_null(),
409+
WhereConfigOperator::ILike => col(column).ilike(lit(&expr.value)),
410+
WhereConfigOperator::Contains => col(column).like(lit(&expr.value)),
411+
WhereConfigOperator::BeginsWith => Expr::BinaryExpr(BinaryExpr::new(
412+
Box::new(col(column)),
413+
Operator::RegexIMatch,
414+
Box::new(lit(format!("^{}", expr.value))),
415+
)),
416+
WhereConfigOperator::EndsWith => Expr::BinaryExpr(BinaryExpr::new(
417+
Box::new(col(column)),
418+
Operator::RegexIMatch,
419+
Box::new(lit(format!("{}$", expr.value))),
420+
)),
421+
WhereConfigOperator::DoesNotContain => col(column).not_ilike(lit(&expr.value)),
422+
WhereConfigOperator::DoesNotBeginWith => Expr::BinaryExpr(BinaryExpr::new(
423+
Box::new(col(column)),
424+
Operator::RegexNotIMatch,
425+
Box::new(lit(format!("^{}", expr.value))),
426+
)),
427+
WhereConfigOperator::DoesNotEndWith => Expr::BinaryExpr(BinaryExpr::new(
428+
Box::new(col(column)),
429+
Operator::RegexNotIMatch,
430+
Box::new(lit(format!("{}$", expr.value))),
431+
)),
434432
}
435433
}
436434

437435
fn match_aggregate_operation(agg: &AggregateConfig) -> Expr {
436+
// for maintaining column case
437+
let column = format!(r#""{}""#, agg.column);
438438
match agg.aggregate_function {
439-
AggregateFunction::Avg => avg(col(&agg.column)),
440-
AggregateFunction::Count => count(col(&agg.column)),
441-
AggregateFunction::Min => min(col(&agg.column)),
442-
AggregateFunction::Max => max(col(&agg.column)),
443-
AggregateFunction::Sum => sum(col(&agg.column)),
439+
AggregateFunction::Avg => avg(col(column)),
440+
AggregateFunction::Count => count(col(column)),
441+
AggregateFunction::Min => min(col(column)),
442+
AggregateFunction::Max => max(col(column)),
443+
AggregateFunction::Sum => sum(col(column)),
444+
}
445+
}
446+
447+
enum NumberOrString {
448+
Number(f64),
449+
String(String),
450+
}
451+
452+
impl Literal for NumberOrString {
453+
fn lit(&self) -> Expr {
454+
match self {
455+
NumberOrString::Number(expr) => lit(*expr),
456+
NumberOrString::String(expr) => lit(expr.clone()),
457+
}
458+
}
459+
}
460+
impl NumberOrString {
461+
fn from_string(value: String) -> Self {
462+
if let Ok(num) = value.parse::<f64>() {
463+
NumberOrString::Number(num)
464+
} else {
465+
NumberOrString::String(value)
466+
}
444467
}
445468
}

0 commit comments

Comments
 (0)