From 12aff338187068dbcb6fe33ce14f12f66ad4436d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 11 Aug 2022 22:46:55 +0530 Subject: [PATCH 1/9] Alerting foundations --- server/src/alerts.rs | 113 +++++++++++++++++++++++++++++++ server/src/event.rs | 3 + server/src/handlers/logstream.rs | 50 +++++++------- server/src/main.rs | 1 + server/src/metadata.rs | 39 ++++++++--- server/src/s3.rs | 15 ++-- server/src/storage.rs | 5 +- server/src/validator.rs | 72 -------------------- 8 files changed, 184 insertions(+), 114 deletions(-) create mode 100644 server/src/alerts.rs diff --git a/server/src/alerts.rs b/server/src/alerts.rs new file mode 100644 index 000000000..2b772b3d0 --- /dev/null +++ b/server/src/alerts.rs @@ -0,0 +1,113 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::{error::Error, event::Event}; + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Alerts { + pub alerts: Vec, +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Alert { + pub name: String, + pub message: String, + pub rule: Rule, + pub targets: Vec, +} + +impl Alert { + // TODO: spawn async tasks to call webhooks if alert rules are met + // This is done to ensure that threads aren't blocked by calls to the webhook + pub async fn parse_event(&self, event: &Event) -> Result<(), Error> { + if self.rule.matches(&event) { + for _ in self.targets.clone() { + actix_web::rt::spawn(async move {}); + } + } + + Ok(()) + } +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Rule { + pub field: String, + /// Field that determines what comparison operator is to be used + #[serde(default)] + pub comparator: Comparator, + pub value: Value, + pub repeats: u32, + pub within: String, +} + +impl Rule { + pub fn matches(&self, _event: &Event) -> bool { + true + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Comparator { + EqualTo, + GreaterThan, + LessThan, +} + +impl Default for Comparator { + fn default() -> Self { + Self::EqualTo + } +} + +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Target { + pub name: String, + #[serde(rename = "server_url")] + pub server_url: String, + #[serde(rename = "api_key")] + pub api_key: String, +} + +pub fn alert(body: String) -> Result<(), Error> { + let alerts: Alerts = serde_json::from_str(body.as_str())?; + for alert in alerts.alerts { + if alert.name.is_empty() { + return Err(Error::InvalidAlert( + "alert name cannot be empty".to_string(), + )); + } + if alert.message.is_empty() { + return Err(Error::InvalidAlert( + "alert message cannot be empty".to_string(), + )); + } + if alert.rule.value.is_number() { + return Err(Error::InvalidAlert( + "rule.value must be a numerical value".to_string(), + )); + } + if alert.rule.field.is_empty() { + return Err(Error::InvalidAlert("rule.field must be set".to_string())); + } + if alert.rule.within.is_empty() { + return Err(Error::InvalidAlert("rule.within must be set".to_string())); + } + if alert.rule.repeats == 0 { + return Err(Error::InvalidAlert( + "rule.repeats can't be set to 0".to_string(), + )); + } + if alert.targets.is_empty() { + return Err(Error::InvalidAlert( + "alert must have at least one target".to_string(), + )); + } + } + Ok(()) +} diff --git a/server/src/event.rs b/server/src/event.rs index ca63f967c..fb5fc1c51 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -35,6 +35,7 @@ use crate::response; use crate::storage::ObjectStorage; use crate::Error; +#[derive(Clone)] pub struct Event { pub body: String, pub stream_name: String, @@ -60,6 +61,8 @@ impl Event { storage: &impl ObjectStorage, ) -> Result { let schema = metadata::STREAM_INFO.schema(self.stream_name.clone())?; + metadata::STREAM_INFO.parse_event(self).await?; + if schema.is_empty() { self.first_event(storage).await } else { diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 9f2ff9883..cb67e328b 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -19,11 +19,11 @@ use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, HttpResponse, Responder}; -use crate::metadata; +use crate::alerts::Alerts; use crate::response; use crate::s3::S3; use crate::storage::ObjectStorage; -use crate::validator; +use crate::{alerts, metadata, validator}; pub async fn delete(req: HttpRequest) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -117,25 +117,22 @@ pub async fn get_alert(req: HttpRequest) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); match metadata::STREAM_INFO.alert(stream_name.clone()) { - Ok(alert) => response::ServerResponse { - msg: alert, + Ok(alerts) => response::ServerResponse { + msg: serde_json::to_string(&alerts).unwrap(), code: StatusCode::OK, } .to_http(), - Err(_) => match S3::new().get_alert(&stream_name).await { - Ok(alert) if alert.is_empty() => response::ServerResponse { + Err(_) => match S3::new().get_alerts(&stream_name).await { + Ok(alerts) if alerts.alerts.is_empty() => response::ServerResponse { msg: "alert configuration not set for log stream {}".to_string(), code: StatusCode::BAD_REQUEST, } .to_http(), - Ok(alert) => { - let buf = alert.as_ref(); - response::ServerResponse { - msg: String::from_utf8(buf.to_vec()).unwrap(), - code: StatusCode::OK, - } - .to_http() + Ok(alerts) => response::ServerResponse { + msg: serde_json::to_string(&alerts).unwrap(), + code: StatusCode::OK, } + .to_http(), Err(_) => response::ServerResponse { msg: "alert doesn't exist".to_string(), code: StatusCode::BAD_REQUEST, @@ -164,7 +161,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse { if let Err(e) = metadata::STREAM_INFO.add_stream( stream_name.to_string(), "".to_string(), - "".to_string(), + Default::default(), ) { return response::ServerResponse { msg: format!( @@ -210,16 +207,23 @@ pub async fn put(req: HttpRequest) -> HttpResponse { pub async fn put_alert(req: HttpRequest, body: web::Json) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let alert_config = body.clone(); - match validator::alert(serde_json::to_string(&body.as_object()).unwrap()) { - Ok(_) => match S3::new() - .create_alert(&stream_name, alert_config.to_string()) - .await - { + let alerts: Alerts = match serde_json::from_value(body.clone()) { + Ok(alerts) => alerts, + Err(e) => { + return response::ServerResponse { + msg: format!( + "failed to set alert configuration for log stream {} due to err: {}", + stream_name, e + ), + code: StatusCode::INTERNAL_SERVER_ERROR, + } + .to_http() + } + }; + match alerts::alert(serde_json::to_string(&body.as_object()).unwrap()) { + Ok(_) => match S3::new().put_alerts(&stream_name, alerts.clone()).await { Ok(_) => { - if let Err(e) = metadata::STREAM_INFO - .set_alert(stream_name.to_string(), alert_config.to_string()) - { + if let Err(e) = metadata::STREAM_INFO.set_alert(stream_name.to_string(), alerts) { return response::ServerResponse { msg: format!( "failed to set alert configuration for log stream {} due to err: {}", diff --git a/server/src/main.rs b/server/src/main.rs index 1b90f68ad..180f31ba2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -30,6 +30,7 @@ include!(concat!(env!("OUT_DIR"), "/generated.rs")); use std::thread; +mod alerts; mod banner; mod error; mod event; diff --git a/server/src/metadata.rs b/server/src/metadata.rs index a8f3f77c4..e0c082849 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -23,13 +23,15 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::RwLock; +use crate::alerts::Alerts; use crate::error::Error; +use crate::event::Event; use crate::storage::ObjectStorage; #[derive(Debug, Default)] pub struct LogStreamMetadata { pub schema: String, - pub alert_config: String, + pub alerts: Alerts, pub stats: Stats, } @@ -66,9 +68,22 @@ lazy_static! { // 5. When set alert API is called (update the alert) #[allow(clippy::all)] impl STREAM_INFO { + pub async fn parse_event(&self, event: &Event) -> Result<(), Error> { + let map = self.read().unwrap(); + let meta = map + .get(&event.stream_name) + .ok_or(Error::StreamMetaNotFound(event.stream_name.to_owned()))?; + + for alert in &meta.alerts.alerts { + alert.parse_event(event).await?; + } + + Ok(()) + } + pub fn set_schema(&self, stream_name: String, schema: String) -> Result<(), Error> { - let alert_config = self.alert(stream_name.clone())?; - self.add_stream(stream_name, schema, alert_config) + let alerts = self.alert(stream_name.clone())?; + self.add_stream(stream_name, schema, alerts) } pub fn schema(&self, stream_name: String) -> Result { @@ -80,30 +95,30 @@ impl STREAM_INFO { Ok(meta.schema.clone()) } - pub fn set_alert(&self, stream_name: String, alert_config: String) -> Result<(), Error> { + pub fn set_alert(&self, stream_name: String, alerts: Alerts) -> Result<(), Error> { let schema = self.schema(stream_name.clone())?; - self.add_stream(stream_name, schema, alert_config) + self.add_stream(stream_name, schema, alerts) } - pub fn alert(&self, stream_name: String) -> Result { + pub fn alert(&self, stream_name: String) -> Result { let map = self.read().unwrap(); let meta = map .get(&stream_name) .ok_or(Error::StreamMetaNotFound(stream_name))?; - Ok(meta.alert_config.clone()) + Ok(meta.alerts.clone()) } pub fn add_stream( &self, stream_name: String, schema: String, - alert_config: String, + alerts: Alerts, ) -> Result<(), Error> { let mut map = self.write().unwrap(); let metadata = LogStreamMetadata { schema, - alert_config, + alerts, ..Default::default() }; // TODO: Add check to confirm data insertion @@ -126,13 +141,15 @@ impl STREAM_INFO { // to load the stream metadata based on whatever is available. // // TODO: ignore failure(s) if any and skip to next stream - let alert_config = parse_string(storage.get_alert(&stream.name).await) + let alerts = storage + .get_alerts(&stream.name) + .await .map_err(|_| Error::AlertNotInStore(stream.name.to_owned()))?; let schema = parse_string(storage.get_schema(&stream.name).await) .map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()))?; let metadata = LogStreamMetadata { schema, - alert_config, + alerts, ..Default::default() }; let mut map = self.write().unwrap(); diff --git a/server/src/s3.rs b/server/src/s3.rs index 6c5e43c3b..d7e21fccb 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use structopt::StructOpt; use tokio_stream::StreamExt; +use crate::alerts::Alerts; use crate::error::Error; use crate::metadata::Stats; use crate::option::{StorageOpt, CONFIG}; @@ -199,13 +200,13 @@ impl S3 { Ok(()) } - async fn _create_alert(&self, stream_name: &str, body: String) -> Result<(), AwsSdkError> { + async fn _put_alerts(&self, stream_name: &str, body: Vec) -> Result<(), AwsSdkError> { let _resp = self .client .put_object() .bucket(&S3_CONFIG.s3_bucket_name) .key(format!("{}/.alert.json", stream_name)) - .body(body.into_bytes().into()) + .body(body.into()) .send() .await?; @@ -323,8 +324,9 @@ impl ObjectStorage for S3 { Ok(()) } - async fn create_alert(&self, stream_name: &str, body: String) -> Result<(), Error> { - self._create_alert(stream_name, body).await?; + async fn put_alerts(&self, stream_name: &str, alerts: Alerts) -> Result<(), Error> { + let body = serde_json::to_vec(&alerts)?; + self._put_alerts(stream_name, body).await?; Ok(()) } @@ -335,10 +337,11 @@ impl ObjectStorage for S3 { Ok(body_bytes) } - async fn get_alert(&self, stream_name: &str) -> Result { + async fn get_alerts(&self, stream_name: &str) -> Result { let body_bytes = self._alert_exists(stream_name).await?; + let alerts = serde_json::from_slice(&body_bytes)?; - Ok(body_bytes) + Ok(alerts) } async fn get_stats(&self, stream_name: &str) -> Result { diff --git a/server/src/storage.rs b/server/src/storage.rs index 26348622e..ce36fb903 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,6 +16,7 @@ * */ +use crate::alerts::Alerts; use crate::error::Error; use crate::metadata::Stats; use crate::option::CONFIG; @@ -52,9 +53,9 @@ pub trait ObjectStorage: Sync + 'static { async fn put_schema(&self, stream_name: String, body: String) -> Result<(), Error>; async fn create_stream(&self, stream_name: &str) -> Result<(), Error>; async fn delete_stream(&self, stream_name: &str) -> Result<(), Error>; - async fn create_alert(&self, stream_name: &str, body: String) -> Result<(), Error>; + async fn put_alerts(&self, stream_name: &str, alerts: Alerts) -> Result<(), Error>; async fn get_schema(&self, stream_name: &str) -> Result; - async fn get_alert(&self, stream_name: &str) -> Result; + async fn get_alerts(&self, stream_name: &str) -> Result; async fn get_stats(&self, stream_name: &str) -> Result; async fn list_streams(&self) -> Result, Error>; async fn upload_file(&self, key: &str, path: &str) -> Result<(), Error>; diff --git a/server/src/validator.rs b/server/src/validator.rs index 04e7a4e4d..456682bbb 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -17,8 +17,6 @@ */ use chrono::{DateTime, Utc}; -use serde_derive::Deserialize; -use serde_derive::Serialize; use crate::query::Query; use crate::Error; @@ -28,76 +26,6 @@ const DENIED_NAMES: &[&str] = &[ "select", "from", "where", "group", "by", "order", "limit", "offset", "join", "and", ]; -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Alerts { - pub alerts: Vec, -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Alert { - pub name: String, - pub message: String, - pub rule: Rule, - pub target: Vec, -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Rule { - pub field: String, - pub contains: String, - pub repeats: u32, - pub within: String, -} - -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Target { - pub name: String, - #[serde(rename = "server_url")] - pub server_url: String, - #[serde(rename = "api_key")] - pub api_key: String, -} - -pub fn alert(body: String) -> Result<(), Error> { - let alerts: Alerts = serde_json::from_str(body.as_str())?; - for alert in alerts.alerts { - if alert.name.is_empty() { - return Err(Error::InvalidAlert( - "alert name cannot be empty".to_string(), - )); - } - if alert.message.is_empty() { - return Err(Error::InvalidAlert( - "alert message cannot be empty".to_string(), - )); - } - if alert.rule.contains.is_empty() { - return Err(Error::InvalidAlert("rule.contains must be set".to_string())); - } - if alert.rule.field.is_empty() { - return Err(Error::InvalidAlert("rule.field must be set".to_string())); - } - if alert.rule.within.is_empty() { - return Err(Error::InvalidAlert("rule.within must be set".to_string())); - } - if alert.rule.repeats == 0 { - return Err(Error::InvalidAlert( - "rule.repeats can't be set to 0".to_string(), - )); - } - if alert.target.is_empty() { - return Err(Error::InvalidAlert( - "alert must have at least one target".to_string(), - )); - } - } - Ok(()) -} - pub fn stream_name(str_name: &str) -> Result<(), Error> { if str_name.is_empty() { return Err(Error::EmptyName); From 952fd4574501e985da43e757d3a9af7d4ff7b6bd Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 16 Aug 2022 08:19:41 +0530 Subject: [PATCH 2/9] Improve alerts implementation with suggestions --- server/src/alerts.rs | 50 ++++++++++++++++++++++++++++++++---------- server/src/event.rs | 9 +++++--- server/src/metadata.rs | 17 +++++++++----- 3 files changed, 56 insertions(+), 20 deletions(-) diff --git a/server/src/alerts.rs b/server/src/alerts.rs index 2b772b3d0..e9cf4f91c 100644 --- a/server/src/alerts.rs +++ b/server/src/alerts.rs @@ -1,7 +1,6 @@ use serde::{Deserialize, Serialize}; -use serde_json::Value; -use crate::{error::Error, event::Event}; +use crate::error::Error; #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -21,8 +20,8 @@ pub struct Alert { impl Alert { // TODO: spawn async tasks to call webhooks if alert rules are met // This is done to ensure that threads aren't blocked by calls to the webhook - pub async fn parse_event(&self, event: &Event) -> Result<(), Error> { - if self.rule.matches(&event) { + pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), Error> { + if self.rule.resolves(event).await { for _ in self.targets.clone() { actix_web::rt::spawn(async move {}); } @@ -38,27 +37,54 @@ pub struct Rule { pub field: String, /// Field that determines what comparison operator is to be used #[serde(default)] - pub comparator: Comparator, - pub value: Value, + pub operator: Operator, + pub value: String, pub repeats: u32, + #[serde(skip)] + repeated: u32, pub within: String, } impl Rule { - pub fn matches(&self, _event: &Event) -> bool { - true + // TODO: utilise `within` to set a range for validity of rule to trigger alert + pub async fn resolves(&mut self, event: &serde_json::Value) -> bool { + let comparison = match self.operator { + Operator::EqualTo => event.get(&self.field).unwrap() == &serde_json::json!(self.value), + // TODO: currently this is a hack, ensure checks are performed in the right way + Operator::GreaterThan => { + event.get(&self.field).unwrap().as_f64().unwrap() + > serde_json::json!(self.value).as_f64().unwrap() + } + Operator::LessThan => { + event.get(&self.field).unwrap().as_f64().unwrap() + < serde_json::json!(self.value).as_f64().unwrap() + } + }; + + // If truthy, increment count of repeated + if comparison { + self.repeated += 1; + } + + // If enough repetitions made, return true + if self.repeated >= self.repeats { + self.repeated = 0; + return true; + } + + false } } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] -pub enum Comparator { +pub enum Operator { EqualTo, GreaterThan, LessThan, } -impl Default for Comparator { +impl Default for Operator { fn default() -> Self { Self::EqualTo } @@ -87,9 +113,9 @@ pub fn alert(body: String) -> Result<(), Error> { "alert message cannot be empty".to_string(), )); } - if alert.rule.value.is_number() { + if alert.rule.value.is_empty() { return Err(Error::InvalidAlert( - "rule.value must be a numerical value".to_string(), + "rule.value cannot be empty".to_string(), )); } if alert.rule.field.is_empty() { diff --git a/server/src/event.rs b/server/src/event.rs index fb5fc1c51..062a7cc50 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -61,13 +61,16 @@ impl Event { storage: &impl ObjectStorage, ) -> Result { let schema = metadata::STREAM_INFO.schema(self.stream_name.clone())?; - metadata::STREAM_INFO.parse_event(self).await?; - if schema.is_empty() { + let result = if schema.is_empty() { self.first_event(storage).await } else { self.event() - } + }; + + metadata::STREAM_INFO.check_alerts(self).await?; + + result } // This is called when the first event of a log stream is received. The first event is diff --git a/server/src/metadata.rs b/server/src/metadata.rs index e0c082849..186d43b18 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,7 +18,7 @@ use bytes::Bytes; use lazy_static::lazy_static; -use log::warn; +use log::{error, warn}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::RwLock; @@ -68,15 +68,22 @@ lazy_static! { // 5. When set alert API is called (update the alert) #[allow(clippy::all)] impl STREAM_INFO { - pub async fn parse_event(&self, event: &Event) -> Result<(), Error> { + pub async fn check_alerts(&self, event: &Event) -> Result<(), Error> { let map = self.read().unwrap(); let meta = map .get(&event.stream_name) .ok_or(Error::StreamMetaNotFound(event.stream_name.to_owned()))?; - for alert in &meta.alerts.alerts { - alert.parse_event(event).await?; - } + let alerts = meta.alerts.alerts.clone(); + let event: serde_json::Value = serde_json::from_str(&event.body)?; + + actix_web::rt::spawn(async move { + for mut alert in alerts { + if let Err(e) = alert.check_alert(&event).await { + error!("Error while parsing event against alerts: {}", e); + } + } + }); Ok(()) } From 54c9c1af8303429e25174789bb5fef1d7a5bd0e5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 17 Aug 2022 21:49:59 +0530 Subject: [PATCH 3/9] make post requests to webhook/targets --- server/Cargo.toml | 1 + server/src/alerts.rs | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 4313ccc22..aa19e296b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -48,6 +48,7 @@ clokwerk = "0.4.0-rc1" actix-web-static-files = "4.0" static-files = "0.2.1" walkdir = "2" +ureq = "2.5.0" [build-dependencies] static-files = "0.2.1" diff --git a/server/src/alerts.rs b/server/src/alerts.rs index e9cf4f91c..2c55942c0 100644 --- a/server/src/alerts.rs +++ b/server/src/alerts.rs @@ -1,3 +1,4 @@ +use log::error; use serde::{Deserialize, Serialize}; use crate::error::Error; @@ -22,8 +23,11 @@ impl Alert { // This is done to ensure that threads aren't blocked by calls to the webhook pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), Error> { if self.rule.resolves(event).await { - for _ in self.targets.clone() { - actix_web::rt::spawn(async move {}); + for target in self.targets.clone() { + let msg = self.message.clone(); + actix_web::rt::spawn(async move { + target.call(&msg); + }); } } @@ -100,6 +104,18 @@ pub struct Target { pub api_key: String, } +impl Target { + pub fn call(&self, msg: &str) { + if let Err(e) = ureq::post(&self.server_url) + .set("Content-Type", "text/plain; charset=iso-8859-1") + .set("X-API-Key", &self.api_key) + .send_string(msg) + { + error!("Couldn't make call to webhook, error: {}", e) + } + } +} + pub fn alert(body: String) -> Result<(), Error> { let alerts: Alerts = serde_json::from_str(body.as_str())?; for alert in alerts.alerts { From 4b09fb0d55e3272ae0d9ee993b25ca3eb94adc72 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 18 Aug 2022 06:40:36 +0530 Subject: [PATCH 4/9] Add license header --- server/src/alerts.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/server/src/alerts.rs b/server/src/alerts.rs index 2c55942c0..86496301a 100644 --- a/server/src/alerts.rs +++ b/server/src/alerts.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use log::error; use serde::{Deserialize, Serialize}; From 5ea9d80f82a0a0c4be4371a503583dbc7fdec03d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 18 Aug 2022 07:12:27 +0530 Subject: [PATCH 5/9] Fix merge issues --- server/src/alerts.rs | 48 ++++---------------------------- server/src/handlers/logstream.rs | 4 +-- server/src/metadata.rs | 27 +++++++++--------- server/src/validator.rs | 9 ++++-- 4 files changed, 27 insertions(+), 61 deletions(-) diff --git a/server/src/alerts.rs b/server/src/alerts.rs index 86496301a..96486f4f5 100644 --- a/server/src/alerts.rs +++ b/server/src/alerts.rs @@ -21,13 +21,13 @@ use serde::{Deserialize, Serialize}; use crate::error::Error; -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Alerts { pub alerts: Vec, } -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Alert { pub name: String, @@ -53,7 +53,7 @@ impl Alert { } } -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Rule { pub field: String, @@ -98,7 +98,7 @@ impl Rule { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub enum Operator { EqualTo, @@ -112,7 +112,7 @@ impl Default for Operator { } } -#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Target { pub name: String, @@ -133,41 +133,3 @@ impl Target { } } } - -pub fn alert(body: String) -> Result<(), Error> { - let alerts: Alerts = serde_json::from_str(body.as_str())?; - for alert in alerts.alerts { - if alert.name.is_empty() { - return Err(Error::InvalidAlert( - "alert name cannot be empty".to_string(), - )); - } - if alert.message.is_empty() { - return Err(Error::InvalidAlert( - "alert message cannot be empty".to_string(), - )); - } - if alert.rule.value.is_empty() { - return Err(Error::InvalidAlert( - "rule.value cannot be empty".to_string(), - )); - } - if alert.rule.field.is_empty() { - return Err(Error::InvalidAlert("rule.field must be set".to_string())); - } - if alert.rule.within.is_empty() { - return Err(Error::InvalidAlert("rule.within must be set".to_string())); - } - if alert.rule.repeats == 0 { - return Err(Error::InvalidAlert( - "rule.repeats can't be set to 0".to_string(), - )); - } - if alert.targets.is_empty() { - return Err(Error::InvalidAlert( - "alert must have at least one target".to_string(), - )); - } - } - Ok(()) -} diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index d3ca1ab02..f25ff53a2 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -23,7 +23,7 @@ use crate::alerts::Alerts; use crate::response; use crate::s3::S3; use crate::storage::ObjectStorage; -use crate::{alerts, metadata, validator}; +use crate::{metadata, validator}; pub async fn delete(req: HttpRequest) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -218,7 +218,7 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> .to_http() } }; - match alerts::alert(serde_json::to_string(&body.as_object()).unwrap()) { + match validator::alert(serde_json::to_string(&body.as_object()).unwrap()) { Ok(_) => match S3::new().put_alerts(&stream_name, alerts.clone()).await { Ok(_) => { if let Err(e) = metadata::STREAM_INFO.set_alert(stream_name.to_string(), alerts) { diff --git a/server/src/metadata.rs b/server/src/metadata.rs index d27084012..0218cf9d4 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -103,14 +103,14 @@ impl STREAM_INFO { } pub fn set_alert(&self, stream_name: String, alerts: Alerts) -> Result<(), Error> { - let schema = self.schema(stream_name.clone())?; + let schema = self.schema(&stream_name)?; self.add_stream(stream_name, schema, alerts) } pub fn alert(&self, stream_name: String) -> Result { let map = self.read().unwrap(); let meta = map - .get(stream_name) + .get(&stream_name) .ok_or(Error::StreamMetaNotFound(stream_name.to_owned()))?; Ok(meta.alerts.clone()) @@ -249,24 +249,21 @@ mod tests { } #[rstest] - #[case::stream_schema_alert("teststream", "schema", "alert_config")] - #[case::stream_only("teststream", "", "")] + #[case::stream_schema_alert("teststream", "schema")] + #[case::stream_only("teststream", "")] #[serial] - fn test_add_stream( - #[case] stream_name: String, - #[case] schema: String, - #[case] alert_config: String, - ) { + fn test_add_stream(#[case] stream_name: String, #[case] schema: String) { + let alerts = Alerts { alerts: vec![] }; clear_map(); STREAM_INFO - .add_stream(stream_name.clone(), schema.clone(), alert_config.clone()) + .add_stream(stream_name.clone(), schema.clone(), alerts.clone()) .unwrap(); let left = STREAM_INFO.read().unwrap().clone(); let right = hashmap! { stream_name => LogStreamMetadata { - schema: schema, - alert_config: alert_config, + schema, + alerts, ..Default::default() } }; @@ -279,7 +276,11 @@ mod tests { fn test_delete_stream(#[case] stream_name: String) { clear_map(); STREAM_INFO - .add_stream(stream_name.clone(), "".to_string(), "".to_string()) + .add_stream( + stream_name.clone(), + "".to_string(), + Alerts { alerts: vec![] }, + ) .unwrap(); STREAM_INFO.delete_stream(&stream_name).unwrap(); diff --git a/server/src/validator.rs b/server/src/validator.rs index 8aaf41b74..0240f2c75 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -18,6 +18,7 @@ use chrono::{DateTime, Utc}; +use crate::alerts::Alerts; use crate::query::Query; use crate::Error; @@ -39,8 +40,10 @@ pub fn alert(body: String) -> Result<(), Error> { "alert message cannot be empty".to_string(), )); } - if alert.rule.contains.is_empty() { - return Err(Error::InvalidAlert("rule.contains must be set".to_string())); + if alert.rule.value.is_empty() { + return Err(Error::InvalidAlert( + "rule.value cannot be empty".to_string(), + )); } if alert.rule.field.is_empty() { return Err(Error::InvalidAlert("rule.field must be set".to_string())); @@ -53,7 +56,7 @@ pub fn alert(body: String) -> Result<(), Error> { "rule.repeats can't be set to 0".to_string(), )); } - if alert.target.is_empty() { + if alert.targets.is_empty() { return Err(Error::InvalidAlert( "alert must have at least one target".to_string(), )); From 9d530a8b5bd8a0d8fdcd22dacebe8ab0e9d4a834 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 18 Aug 2022 08:42:12 +0530 Subject: [PATCH 6/9] fmt --- server/src/event.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index b1e01c0ba..0b323c4f8 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -91,9 +91,8 @@ impl Event { { error!("Couldn't update stream stats. {:?}", e); } - - if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await - { + + if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await { error!("Error checking for alerts. {:?}", e); } From ced48657f1903c1a3282da104ef1c0ce03077d91 Mon Sep 17 00:00:00 2001 From: Nitish Tiwari Date: Thu, 18 Aug 2022 13:39:26 +0530 Subject: [PATCH 7/9] Change type of value to serde json value --- server/src/alerts.rs | 10 ++++------ server/src/validator.rs | 3 ++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/alerts.rs b/server/src/alerts.rs index 96486f4f5..9ba1a0f28 100644 --- a/server/src/alerts.rs +++ b/server/src/alerts.rs @@ -60,7 +60,7 @@ pub struct Rule { /// Field that determines what comparison operator is to be used #[serde(default)] pub operator: Operator, - pub value: String, + pub value: serde_json::Value, pub repeats: u32, #[serde(skip)] repeated: u32, @@ -71,15 +71,13 @@ impl Rule { // TODO: utilise `within` to set a range for validity of rule to trigger alert pub async fn resolves(&mut self, event: &serde_json::Value) -> bool { let comparison = match self.operator { - Operator::EqualTo => event.get(&self.field).unwrap() == &serde_json::json!(self.value), + Operator::EqualTo => event.get(&self.field).unwrap() == &self.value, // TODO: currently this is a hack, ensure checks are performed in the right way Operator::GreaterThan => { - event.get(&self.field).unwrap().as_f64().unwrap() - > serde_json::json!(self.value).as_f64().unwrap() + event.get(&self.field).unwrap().as_f64().unwrap() > (self.value).as_f64().unwrap() } Operator::LessThan => { - event.get(&self.field).unwrap().as_f64().unwrap() - < serde_json::json!(self.value).as_f64().unwrap() + event.get(&self.field).unwrap().as_f64().unwrap() < (self.value).as_f64().unwrap() } }; diff --git a/server/src/validator.rs b/server/src/validator.rs index 0240f2c75..d1af5bec3 100644 --- a/server/src/validator.rs +++ b/server/src/validator.rs @@ -17,6 +17,7 @@ */ use chrono::{DateTime, Utc}; +use serde_json::json; use crate::alerts::Alerts; use crate::query::Query; @@ -40,7 +41,7 @@ pub fn alert(body: String) -> Result<(), Error> { "alert message cannot be empty".to_string(), )); } - if alert.rule.value.is_empty() { + if alert.rule.value == json!(null) { return Err(Error::InvalidAlert( "rule.value cannot be empty".to_string(), )); From 420e61406c12ce7aa03d04375e80c9d56a546912 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 24 Aug 2022 06:34:07 +0530 Subject: [PATCH 8/9] Fix mutability issues that failed alerts --- server/src/alerts.rs | 3 +- server/src/handlers/logstream.rs | 66 ++++++++++++++++---------------- server/src/metadata.rs | 15 +++----- 3 files changed, 42 insertions(+), 42 deletions(-) diff --git a/server/src/alerts.rs b/server/src/alerts.rs index 9ba1a0f28..13ae789a9 100644 --- a/server/src/alerts.rs +++ b/server/src/alerts.rs @@ -16,7 +16,7 @@ * */ -use log::error; +use log::{error, info}; use serde::{Deserialize, Serialize}; use crate::error::Error; @@ -41,6 +41,7 @@ impl Alert { // This is done to ensure that threads aren't blocked by calls to the webhook pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), Error> { if self.rule.resolves(event).await { + info!("Alert triggered; name: {}", self.name); for target in self.targets.clone() { let msg = self.message.clone(); actix_web::rt::spawn(async move { diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index f25ff53a2..9a11afcc4 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -205,7 +205,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse { pub async fn put_alert(req: HttpRequest, body: web::Json) -> HttpResponse { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let alerts: Alerts = match serde_json::from_value(body.clone()) { + let alerts: Alerts = match serde_json::from_value(body.into_inner()) { Ok(alerts) => alerts, Err(e) => { return response::ServerResponse { @@ -213,46 +213,48 @@ pub async fn put_alert(req: HttpRequest, body: web::Json) -> "failed to set alert configuration for log stream {} due to err: {}", stream_name, e ), - code: StatusCode::INTERNAL_SERVER_ERROR, + code: StatusCode::BAD_REQUEST, } .to_http() } }; - match validator::alert(serde_json::to_string(&body.as_object()).unwrap()) { - Ok(_) => match S3::new().put_alerts(&stream_name, alerts.clone()).await { - Ok(_) => { - if let Err(e) = metadata::STREAM_INFO.set_alert(stream_name.to_string(), alerts) { - return response::ServerResponse { - msg: format!( - "failed to set alert configuration for log stream {} due to err: {}", - stream_name, e - ), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(); - } - response::ServerResponse { - msg: format!("set alert configuration for log stream {}", stream_name), - code: StatusCode::OK, - } - .to_http() - } - Err(e) => response::ServerResponse { - msg: format!( - "failed to set alert configuration for log stream {} due to err: {}", - stream_name, e - ), - code: StatusCode::INTERNAL_SERVER_ERROR, - } - .to_http(), - }, - Err(e) => response::ServerResponse { + + if let Err(e) = validator::alert(serde_json::to_string(&alerts).unwrap()) { + return response::ServerResponse { msg: format!( "failed to set alert configuration for log stream {} due to err: {}", stream_name, e ), code: StatusCode::BAD_REQUEST, } - .to_http(), + .to_http(); } + + if let Err(e) = S3::new().put_alerts(&stream_name, alerts.clone()).await { + return response::ServerResponse { + msg: format!( + "failed to set alert configuration for log stream {} due to err: {}", + stream_name, e + ), + code: StatusCode::INTERNAL_SERVER_ERROR, + } + .to_http(); + } + + if let Err(e) = metadata::STREAM_INFO.set_alert(stream_name.to_string(), alerts) { + return response::ServerResponse { + msg: format!( + "failed to set alert configuration for log stream {} due to err: {}", + stream_name, e + ), + code: StatusCode::INTERNAL_SERVER_ERROR, + } + .to_http(); + } + + response::ServerResponse { + msg: format!("set alert configuration for log stream {}", stream_name), + code: StatusCode::OK, + } + .to_http() } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 0218cf9d4..922c0b919 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -69,21 +69,18 @@ lazy_static! { #[allow(clippy::all)] impl STREAM_INFO { pub async fn check_alerts(&self, event: &Event) -> Result<(), Error> { - let map = self.read().unwrap(); + let mut map = self.write().unwrap(); let meta = map - .get(&event.stream_name) + .get_mut(&event.stream_name) .ok_or(Error::StreamMetaNotFound(event.stream_name.to_owned()))?; - let alerts = meta.alerts.alerts.clone(); let event: serde_json::Value = serde_json::from_str(&event.body)?; - actix_web::rt::spawn(async move { - for mut alert in alerts { - if let Err(e) = alert.check_alert(&event).await { - error!("Error while parsing event against alerts: {}", e); - } + for alert in meta.alerts.alerts.iter_mut() { + if let Err(e) = alert.check_alert(&event).await { + error!("Error while parsing event against alerts: {}", e); } - }); + } Ok(()) } From 5999b3d29e2d35bed8b95ee350ccc772c190f453 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 24 Aug 2022 06:59:16 +0530 Subject: [PATCH 9/9] clean up --- server/src/s3.rs | 7 +++++-- server/src/storage.rs | 1 - 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/s3.rs b/server/src/s3.rs index 59a95ec16..103a165b1 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -20,7 +20,6 @@ use structopt::StructOpt; use tokio_stream::StreamExt; use crate::alerts::Alerts; -use crate::error::Error; use crate::metadata::Stats; use crate::option::{StorageOpt, CONFIG}; use crate::query::Query; @@ -330,7 +329,11 @@ impl ObjectStorage for S3 { Ok(()) } - async fn put_alerts(&self, stream_name: &str, alerts: Alerts) -> Result<(), ObjectStorageError> { + async fn put_alerts( + &self, + stream_name: &str, + alerts: Alerts, + ) -> Result<(), ObjectStorageError> { let body = serde_json::to_vec(&alerts)?; self._put_alerts(stream_name, body).await?; diff --git a/server/src/storage.rs b/server/src/storage.rs index f3447e1bf..01a579bc8 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -17,7 +17,6 @@ */ use crate::alerts::Alerts; -use crate::error::Error; use crate::metadata::Stats; use crate::option::CONFIG; use crate::query::Query;