Skip to content

Commit 008c351

Browse files
de-shnitisht
andauthored
feat: foundational implementation for alerts (#28)
Add support for alerting based on a simple configuration Fixes #4 Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 44c55e6 commit 008c351

File tree

9 files changed

+262
-125
lines changed

9 files changed

+262
-125
lines changed

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ clokwerk = "0.4.0-rc1"
4949
actix-web-static-files = "4.0"
5050
static-files = "0.2.1"
5151
walkdir = "2"
52+
ureq = "2.5.0"
5253

5354
[build-dependencies]
5455
static-files = "0.2.1"

server/src/alerts.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Parseable Server (C) 2022 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use log::{error, info};
20+
use serde::{Deserialize, Serialize};
21+
22+
use crate::error::Error;
23+
24+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25+
#[serde(rename_all = "camelCase")]
26+
pub struct Alerts {
27+
pub alerts: Vec<Alert>,
28+
}
29+
30+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31+
#[serde(rename_all = "camelCase")]
32+
pub struct Alert {
33+
pub name: String,
34+
pub message: String,
35+
pub rule: Rule,
36+
pub targets: Vec<Target>,
37+
}
38+
39+
impl Alert {
40+
// TODO: spawn async tasks to call webhooks if alert rules are met
41+
// This is done to ensure that threads aren't blocked by calls to the webhook
42+
pub async fn check_alert(&mut self, event: &serde_json::Value) -> Result<(), Error> {
43+
if self.rule.resolves(event).await {
44+
info!("Alert triggered; name: {}", self.name);
45+
for target in self.targets.clone() {
46+
let msg = self.message.clone();
47+
actix_web::rt::spawn(async move {
48+
target.call(&msg);
49+
});
50+
}
51+
}
52+
53+
Ok(())
54+
}
55+
}
56+
57+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58+
#[serde(rename_all = "camelCase")]
59+
pub struct Rule {
60+
pub field: String,
61+
/// Field that determines what comparison operator is to be used
62+
#[serde(default)]
63+
pub operator: Operator,
64+
pub value: serde_json::Value,
65+
pub repeats: u32,
66+
#[serde(skip)]
67+
repeated: u32,
68+
pub within: String,
69+
}
70+
71+
impl Rule {
72+
// TODO: utilise `within` to set a range for validity of rule to trigger alert
73+
pub async fn resolves(&mut self, event: &serde_json::Value) -> bool {
74+
let comparison = match self.operator {
75+
Operator::EqualTo => event.get(&self.field).unwrap() == &self.value,
76+
// TODO: currently this is a hack, ensure checks are performed in the right way
77+
Operator::GreaterThan => {
78+
event.get(&self.field).unwrap().as_f64().unwrap() > (self.value).as_f64().unwrap()
79+
}
80+
Operator::LessThan => {
81+
event.get(&self.field).unwrap().as_f64().unwrap() < (self.value).as_f64().unwrap()
82+
}
83+
};
84+
85+
// If truthy, increment count of repeated
86+
if comparison {
87+
self.repeated += 1;
88+
}
89+
90+
// If enough repetitions made, return true
91+
if self.repeated >= self.repeats {
92+
self.repeated = 0;
93+
return true;
94+
}
95+
96+
false
97+
}
98+
}
99+
100+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
101+
#[serde(rename_all = "camelCase")]
102+
pub enum Operator {
103+
EqualTo,
104+
GreaterThan,
105+
LessThan,
106+
}
107+
108+
impl Default for Operator {
109+
fn default() -> Self {
110+
Self::EqualTo
111+
}
112+
}
113+
114+
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
115+
#[serde(rename_all = "camelCase")]
116+
pub struct Target {
117+
pub name: String,
118+
#[serde(rename = "server_url")]
119+
pub server_url: String,
120+
#[serde(rename = "api_key")]
121+
pub api_key: String,
122+
}
123+
124+
impl Target {
125+
pub fn call(&self, msg: &str) {
126+
if let Err(e) = ureq::post(&self.server_url)
127+
.set("Content-Type", "text/plain; charset=iso-8859-1")
128+
.set("X-API-Key", &self.api_key)
129+
.send_string(msg)
130+
{
131+
error!("Couldn't make call to webhook, error: {}", e)
132+
}
133+
}
134+
}

server/src/event.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::response;
3535
use crate::storage::ObjectStorage;
3636
use crate::Error;
3737

38+
#[derive(Clone)]
3839
pub struct Event {
3940
pub body: String,
4041
pub stream_name: String,
@@ -91,6 +92,10 @@ impl Event {
9192
error!("Couldn't update stream stats. {:?}", e);
9293
}
9394

95+
if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await {
96+
error!("Error checking for alerts. {:?}", e);
97+
}
98+
9499
let msg = if is_first_event {
95100
format!(
96101
"Intial Event recieved for log stream {}, schema uploaded successfully",

server/src/handlers/logstream.rs

Lines changed: 52 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
use actix_web::http::StatusCode;
2020
use actix_web::{web, HttpRequest, HttpResponse, Responder};
2121

22-
use crate::metadata;
22+
use crate::alerts::Alerts;
2323
use crate::response;
2424
use crate::s3::S3;
2525
use crate::storage::ObjectStorage;
26-
use crate::validator;
26+
use crate::{metadata, validator};
2727

2828
pub async fn delete(req: HttpRequest) -> HttpResponse {
2929
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
@@ -116,26 +116,23 @@ pub async fn schema(req: HttpRequest) -> HttpResponse {
116116
pub async fn get_alert(req: HttpRequest) -> HttpResponse {
117117
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
118118

119-
match metadata::STREAM_INFO.alert(&stream_name) {
120-
Ok(alert) => response::ServerResponse {
121-
msg: alert,
119+
match metadata::STREAM_INFO.alert(stream_name.clone()) {
120+
Ok(alerts) => response::ServerResponse {
121+
msg: serde_json::to_string(&alerts).unwrap(),
122122
code: StatusCode::OK,
123123
}
124124
.to_http(),
125-
Err(_) => match S3::new().get_alert(&stream_name).await {
126-
Ok(alert) if alert.is_empty() => response::ServerResponse {
125+
Err(_) => match S3::new().get_alerts(&stream_name).await {
126+
Ok(alerts) if alerts.alerts.is_empty() => response::ServerResponse {
127127
msg: "alert configuration not set for log stream {}".to_string(),
128128
code: StatusCode::BAD_REQUEST,
129129
}
130130
.to_http(),
131-
Ok(alert) => {
132-
let buf = alert.as_ref();
133-
response::ServerResponse {
134-
msg: String::from_utf8(buf.to_vec()).unwrap(),
135-
code: StatusCode::OK,
136-
}
137-
.to_http()
131+
Ok(alerts) => response::ServerResponse {
132+
msg: serde_json::to_string(&alerts).unwrap(),
133+
code: StatusCode::OK,
138134
}
135+
.to_http(),
139136
Err(_) => response::ServerResponse {
140137
msg: "alert doesn't exist".to_string(),
141138
code: StatusCode::BAD_REQUEST,
@@ -164,7 +161,7 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
164161
if let Err(e) = metadata::STREAM_INFO.add_stream(
165162
stream_name.to_string(),
166163
"".to_string(),
167-
"".to_string(),
164+
Default::default(),
168165
) {
169166
return response::ServerResponse {
170167
msg: format!(
@@ -208,47 +205,56 @@ pub async fn put(req: HttpRequest) -> HttpResponse {
208205

209206
pub async fn put_alert(req: HttpRequest, body: web::Json<serde_json::Value>) -> HttpResponse {
210207
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
211-
let alert_config = body.clone();
212-
match validator::alert(serde_json::to_string(&body.as_object()).unwrap()) {
213-
Ok(_) => match S3::new()
214-
.create_alert(&stream_name, alert_config.to_string())
215-
.await
216-
{
217-
Ok(_) => {
218-
if let Err(e) =
219-
metadata::STREAM_INFO.set_alert(stream_name.clone(), alert_config.to_string())
220-
{
221-
return response::ServerResponse {
222-
msg: format!(
223-
"failed to set alert configuration for log stream {} due to err: {}",
224-
stream_name, e
225-
),
226-
code: StatusCode::INTERNAL_SERVER_ERROR,
227-
}
228-
.to_http();
229-
}
230-
response::ServerResponse {
231-
msg: format!("set alert configuration for log stream {}", stream_name),
232-
code: StatusCode::OK,
233-
}
234-
.to_http()
235-
}
236-
Err(e) => response::ServerResponse {
208+
let alerts: Alerts = match serde_json::from_value(body.into_inner()) {
209+
Ok(alerts) => alerts,
210+
Err(e) => {
211+
return response::ServerResponse {
237212
msg: format!(
238213
"failed to set alert configuration for log stream {} due to err: {}",
239214
stream_name, e
240215
),
241-
code: StatusCode::INTERNAL_SERVER_ERROR,
216+
code: StatusCode::BAD_REQUEST,
242217
}
243-
.to_http(),
244-
},
245-
Err(e) => response::ServerResponse {
218+
.to_http()
219+
}
220+
};
221+
222+
if let Err(e) = validator::alert(serde_json::to_string(&alerts).unwrap()) {
223+
return response::ServerResponse {
246224
msg: format!(
247225
"failed to set alert configuration for log stream {} due to err: {}",
248226
stream_name, e
249227
),
250228
code: StatusCode::BAD_REQUEST,
251229
}
252-
.to_http(),
230+
.to_http();
231+
}
232+
233+
if let Err(e) = S3::new().put_alerts(&stream_name, alerts.clone()).await {
234+
return response::ServerResponse {
235+
msg: format!(
236+
"failed to set alert configuration for log stream {} due to err: {}",
237+
stream_name, e
238+
),
239+
code: StatusCode::INTERNAL_SERVER_ERROR,
240+
}
241+
.to_http();
242+
}
243+
244+
if let Err(e) = metadata::STREAM_INFO.set_alert(stream_name.to_string(), alerts) {
245+
return response::ServerResponse {
246+
msg: format!(
247+
"failed to set alert configuration for log stream {} due to err: {}",
248+
stream_name, e
249+
),
250+
code: StatusCode::INTERNAL_SERVER_ERROR,
251+
}
252+
.to_http();
253+
}
254+
255+
response::ServerResponse {
256+
msg: format!("set alert configuration for log stream {}", stream_name),
257+
code: StatusCode::OK,
253258
}
259+
.to_http()
254260
}

server/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use std::time::Duration;
3434
use tokio::sync::oneshot;
3535
use tokio::sync::oneshot::error::TryRecvError;
3636

37+
mod alerts;
3738
mod banner;
3839
mod error;
3940
mod event;

0 commit comments

Comments
 (0)