1919use actix_web:: http:: header:: ContentType ;
2020use arrow_schema:: { DataType , Schema } ;
2121use async_trait:: async_trait;
22- use chrono:: Utc ;
22+ use chrono:: { DateTime , Utc } ;
2323use datafusion:: logical_expr:: { LogicalPlan , Projection } ;
2424use datafusion:: sql:: sqlparser:: parser:: ParserError ;
2525use derive_more:: FromStrError ;
@@ -197,6 +197,14 @@ pub enum AlertType {
197197 Threshold ,
198198}
199199
200+ impl Display for AlertType {
201+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
202+ match self {
203+ AlertType :: Threshold => write ! ( f, "threshold" ) ,
204+ }
205+ }
206+ }
207+
200208#[ derive( Debug , serde:: Serialize , serde:: Deserialize , Clone ) ]
201209#[ serde( rename_all = "camelCase" ) ]
202210pub enum AlertOperator {
@@ -528,6 +536,7 @@ pub struct AlertRequest {
528536 pub threshold_config : ThresholdConfig ,
529537 pub eval_config : EvalConfig ,
530538 pub targets : Vec < Ulid > ,
539+ pub tags : Option < Vec < String > > ,
531540}
532541
533542impl AlertRequest {
@@ -536,17 +545,21 @@ impl AlertRequest {
536545 for id in & self . targets {
537546 TARGETS . get_target_by_id ( id) . await ?;
538547 }
548+ let datasets = resolve_stream_names ( & self . query ) ?;
539549 let config = AlertConfig {
540550 version : AlertVerison :: from ( CURRENT_ALERTS_VERSION ) ,
541551 id : Ulid :: new ( ) ,
542552 severity : self . severity ,
543553 title : self . title ,
544554 query : self . query ,
555+ datasets,
545556 alert_type : self . alert_type ,
546557 threshold_config : self . threshold_config ,
547558 eval_config : self . eval_config ,
548559 targets : self . targets ,
549560 state : AlertState :: default ( ) ,
561+ created : Utc :: now ( ) ,
562+ tags : self . tags ,
550563 } ;
551564 Ok ( config)
552565 }
@@ -561,13 +574,16 @@ pub struct AlertConfig {
561574 pub severity : Severity ,
562575 pub title : String ,
563576 pub query : String ,
577+ pub datasets : Vec < String > ,
564578 pub alert_type : AlertType ,
565579 pub threshold_config : ThresholdConfig ,
566580 pub eval_config : EvalConfig ,
567581 pub targets : Vec < Ulid > ,
568582 // for new alerts, state should be resolved
569583 #[ serde( default ) ]
570584 pub state : AlertState ,
585+ pub created : DateTime < Utc > ,
586+ pub tags : Option < Vec < String > > ,
571587}
572588
573589impl AlertConfig {
@@ -580,6 +596,7 @@ impl AlertConfig {
580596 let alert_info = format ! ( "Alert '{}' (ID: {})" , basic_fields. title, basic_fields. id) ;
581597
582598 let query = Self :: build_query_from_v1 ( alert_json, & alert_info) . await ?;
599+ let datasets = resolve_stream_names ( & query) ?;
583600 let threshold_config = Self :: extract_threshold_config ( alert_json, & alert_info) ?;
584601 let eval_config = Self :: extract_eval_config ( alert_json, & alert_info) ?;
585602 let targets = Self :: extract_targets ( alert_json, & alert_info) ?;
@@ -592,11 +609,14 @@ impl AlertConfig {
592609 severity : basic_fields. severity ,
593610 title : basic_fields. title ,
594611 query,
612+ datasets,
595613 alert_type : AlertType :: Threshold ,
596614 threshold_config,
597615 eval_config,
598616 targets,
599617 state,
618+ created : Utc :: now ( ) ,
619+ tags : None ,
600620 } ;
601621
602622 // Save the migrated alert back to storage
@@ -1183,6 +1203,65 @@ impl AlertConfig {
11831203 }
11841204 Ok ( ( ) )
11851205 }
1206+
1207+ /// create a summary of the dashboard
1208+ /// used for listing dashboards
1209+ pub fn to_summary ( & self ) -> serde_json:: Map < String , serde_json:: Value > {
1210+ let mut map = serde_json:: Map :: new ( ) ;
1211+
1212+ map. insert (
1213+ "title" . to_string ( ) ,
1214+ serde_json:: Value :: String ( self . title . clone ( ) ) ,
1215+ ) ;
1216+
1217+ map. insert (
1218+ "created" . to_string ( ) ,
1219+ serde_json:: Value :: String ( self . created . to_string ( ) ) ,
1220+ ) ;
1221+
1222+ map. insert (
1223+ "alertType" . to_string ( ) ,
1224+ serde_json:: Value :: String ( self . alert_type . to_string ( ) ) ,
1225+ ) ;
1226+
1227+ map. insert (
1228+ "id" . to_string ( ) ,
1229+ serde_json:: Value :: String ( self . id . to_string ( ) ) ,
1230+ ) ;
1231+
1232+ map. insert (
1233+ "severity" . to_string ( ) ,
1234+ serde_json:: Value :: String ( self . severity . to_string ( ) ) ,
1235+ ) ;
1236+
1237+ map. insert (
1238+ "state" . to_string ( ) ,
1239+ serde_json:: Value :: String ( self . state . to_string ( ) ) ,
1240+ ) ;
1241+
1242+ if let Some ( tags) = & self . tags {
1243+ map. insert (
1244+ "tags" . to_string ( ) ,
1245+ serde_json:: Value :: Array (
1246+ tags. iter ( )
1247+ . map ( |tag| serde_json:: Value :: String ( tag. clone ( ) ) )
1248+ . collect ( ) ,
1249+ ) ,
1250+ ) ;
1251+ }
1252+
1253+ map. insert (
1254+ "datasets" . to_string ( ) ,
1255+ serde_json:: Value :: Array (
1256+ self . datasets
1257+ . iter ( )
1258+ . map ( |dataset| serde_json:: Value :: String ( dataset. clone ( ) ) )
1259+ . collect ( ) ,
1260+ ) ,
1261+ ) ;
1262+
1263+ map
1264+ }
11861265}
11871266
11881267#[ derive( Debug , thiserror:: Error ) ]
@@ -1221,6 +1300,8 @@ pub enum AlertError {
12211300 ParserError ( #[ from] ParserError ) ,
12221301 #[ error( "Invalid alert query" ) ]
12231302 InvalidAlertQuery ,
1303+ #[ error( "Invalid query parameter" ) ]
1304+ InvalidQueryParameter ,
12241305}
12251306
12261307impl actix_web:: ResponseError for AlertError {
@@ -1243,6 +1324,7 @@ impl actix_web::ResponseError for AlertError {
12431324 Self :: TargetInUse => StatusCode :: CONFLICT ,
12441325 Self :: ParserError ( _) => StatusCode :: BAD_REQUEST ,
12451326 Self :: InvalidAlertQuery => StatusCode :: BAD_REQUEST ,
1327+ Self :: InvalidQueryParameter => StatusCode :: BAD_REQUEST ,
12461328 }
12471329 }
12481330
@@ -1350,6 +1432,7 @@ impl Alerts {
13501432 pub async fn list_alerts_for_user (
13511433 & self ,
13521434 session : SessionKey ,
1435+ tags : Vec < String > ,
13531436 ) -> Result < Vec < AlertConfig > , AlertError > {
13541437 let mut alerts: Vec < AlertConfig > = Vec :: new ( ) ;
13551438 for ( _, alert) in self . alerts . read ( ) . await . iter ( ) {
@@ -1358,6 +1441,17 @@ impl Alerts {
13581441 alerts. push ( alert. to_owned ( ) ) ;
13591442 }
13601443 }
1444+ if tags. is_empty ( ) {
1445+ return Ok ( alerts) ;
1446+ }
1447+ // filter alerts based on tags
1448+ alerts. retain ( |alert| {
1449+ if let Some ( alert_tags) = & alert. tags {
1450+ alert_tags. iter ( ) . any ( |tag| tags. contains ( tag) )
1451+ } else {
1452+ false
1453+ }
1454+ } ) ;
13611455
13621456 Ok ( alerts)
13631457 }
@@ -1456,6 +1550,20 @@ impl Alerts {
14561550
14571551 Ok ( ( ) )
14581552 }
1553+
1554+ /// List tags from all alerts
1555+ /// This function returns a list of unique tags from all alerts
1556+ pub async fn list_tags ( & self ) -> Vec < String > {
1557+ let alerts = self . alerts . read ( ) . await ;
1558+ let mut tags = alerts
1559+ . iter ( )
1560+ . filter_map ( |( _, alert) | alert. tags . as_ref ( ) )
1561+ . flat_map ( |t| t. iter ( ) . cloned ( ) )
1562+ . collect :: < Vec < String > > ( ) ;
1563+ tags. sort ( ) ;
1564+ tags. dedup ( ) ;
1565+ tags
1566+ }
14591567}
14601568
14611569#[ derive( Debug , Serialize ) ]
0 commit comments