Skip to content

Commit be76b5c

Browse files
committed
bugfix and alerts retry
1 parent ece7953 commit be76b5c

File tree

5 files changed

+52
-30
lines changed

5 files changed

+52
-30
lines changed

src/alerts/mod.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use std::thread;
3232
use tokio::sync::oneshot::{Receiver, Sender};
3333
use tokio::sync::{mpsc, RwLock};
3434
use tokio::task::JoinHandle;
35-
use tracing::{trace, warn};
35+
use tracing::{error, trace, warn};
3636
use ulid::Ulid;
3737

3838
pub mod alerts_utils;
@@ -52,7 +52,7 @@ pub type ScheduledTaskHandlers = (JoinHandle<()>, Receiver<()>, Sender<()>);
5252
pub const CURRENT_ALERTS_VERSION: &str = "v1";
5353

5454
pub static ALERTS: Lazy<Alerts> = Lazy::new(|| {
55-
let (tx, rx) = mpsc::channel::<AlertTask>(1);
55+
let (tx, rx) = mpsc::channel::<AlertTask>(10);
5656
let alerts = Alerts {
5757
alerts: RwLock::new(HashMap::new()),
5858
sender: tx,
@@ -659,7 +659,7 @@ impl AlertConfig {
659659

660660
// validate condition config
661661
let agg1 = &self.aggregates.aggregate_config[0];
662-
let agg2 = &self.aggregates.aggregate_config[0];
662+
let agg2 = &self.aggregates.aggregate_config[1];
663663

664664
validate_condition_config(&agg1.conditions)?;
665665
validate_condition_config(&agg2.conditions)?;
@@ -833,7 +833,20 @@ impl Alerts {
833833
let store = PARSEABLE.storage.get_object_store();
834834

835835
for alert in store.get_alerts().await.unwrap_or_default() {
836-
self.sender.send(AlertTask::Create(alert.clone())).await?;
836+
match self.sender.send(AlertTask::Create(alert.clone())).await {
837+
Ok(_) => {}
838+
Err(e) => {
839+
warn!("Failed to create alert task: {e}\nRetrying...");
840+
// Retry sending the task
841+
match self.sender.send(AlertTask::Create(alert.clone())).await {
842+
Ok(_) => {}
843+
Err(e) => {
844+
error!("Failed to create alert task: {e}");
845+
continue;
846+
}
847+
}
848+
}
849+
};
837850
map.insert(alert.id, alert);
838851
}
839852

src/alerts/target.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use http::{header::AUTHORIZATION, HeaderMap, HeaderValue};
2929
use humantime_serde::re::humantime;
3030
use reqwest::ClientBuilder;
3131
use tracing::{error, trace, warn};
32+
use url::Url;
3233

3334
use super::ALERTS;
3435

@@ -255,9 +256,9 @@ fn default_client_builder() -> ClientBuilder {
255256
ClientBuilder::new()
256257
}
257258

258-
#[derive(Default, Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
259+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
259260
pub struct SlackWebHook {
260-
endpoint: String,
261+
endpoint: Url,
261262
}
262263

263264
#[async_trait]
@@ -279,7 +280,7 @@ impl CallableTarget for SlackWebHook {
279280
}
280281
};
281282

282-
if let Err(e) = client.post(&self.endpoint).json(&alert).send().await {
283+
if let Err(e) = client.post(self.endpoint.clone()).json(&alert).send().await {
283284
error!("Couldn't make call to webhook, error: {}", e)
284285
}
285286
}
@@ -288,7 +289,7 @@ impl CallableTarget for SlackWebHook {
288289
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
289290
#[serde(rename_all = "camelCase")]
290291
pub struct OtherWebHook {
291-
endpoint: String,
292+
endpoint: Url,
292293
#[serde(default)]
293294
headers: HashMap<String, String>,
294295
#[serde(default)]
@@ -314,7 +315,7 @@ impl CallableTarget for OtherWebHook {
314315
};
315316

316317
let request = client
317-
.post(&self.endpoint)
318+
.post(self.endpoint.clone())
318319
.headers((&self.headers).try_into().expect("valid_headers"));
319320

320321
if let Err(e) = request.body(alert).send().await {
@@ -326,7 +327,7 @@ impl CallableTarget for OtherWebHook {
326327
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
327328
#[serde(rename_all = "camelCase")]
328329
pub struct AlertManager {
329-
endpoint: String,
330+
endpoint: Url,
330331
#[serde(default)]
331332
skip_tls_check: bool,
332333
#[serde(flatten)]
@@ -404,7 +405,12 @@ impl CallableTarget for AlertManager {
404405
}
405406
};
406407

407-
if let Err(e) = client.post(&self.endpoint).json(&alerts).send().await {
408+
if let Err(e) = client
409+
.post(self.endpoint.clone())
410+
.json(&alerts)
411+
.send()
412+
.await
413+
{
408414
error!("Couldn't make call to alertmanager, error: {}", e)
409415
}
410416
}

src/handlers/http/modal/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use tokio::sync::oneshot;
3434
use tracing::{error, info, warn};
3535

3636
use crate::{
37+
alerts::ALERTS,
3738
cli::Options,
3839
correlation::CORRELATIONS,
3940
oidc::Claims,
@@ -165,7 +166,7 @@ pub trait ParseableServer {
165166

166167
pub async fn load_on_init() -> anyhow::Result<()> {
167168
// Run all loading operations concurrently
168-
let (correlations_result, filters_result, dashboards_result) = future::join3(
169+
let (correlations_result, filters_result, dashboards_result, alerts_result) = future::join4(
169170
async {
170171
CORRELATIONS
171172
.load()
@@ -174,6 +175,7 @@ pub async fn load_on_init() -> anyhow::Result<()> {
174175
},
175176
async { FILTERS.load().await.context("Failed to load filters") },
176177
async { DASHBOARDS.load().await.context("Failed to load dashboards") },
178+
async { ALERTS.load().await.context("Failed to load alerts") },
177179
)
178180
.await;
179181

@@ -190,6 +192,10 @@ pub async fn load_on_init() -> anyhow::Result<()> {
190192
error!("{err}");
191193
}
192194

195+
if let Err(err) = alerts_result {
196+
error!("{err}");
197+
}
198+
193199
Ok(())
194200
}
195201

src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
use std::thread;
2020

21-
use crate::alerts::ALERTS;
2221
use crate::handlers::airplane;
2322
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
2423
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
@@ -34,7 +33,7 @@ use actix_web_prometheus::PrometheusMetrics;
3433
use async_trait::async_trait;
3534
use bytes::Bytes;
3635
use tokio::sync::oneshot;
37-
use tracing::{error, info};
36+
use tracing::info;
3837

3938
use crate::parseable::PARSEABLE;
4039
use crate::Server;
@@ -128,17 +127,6 @@ impl ParseableServer for QueryServer {
128127
let (cancel_tx, cancel_rx) = oneshot::channel();
129128
thread::spawn(|| sync::handler(cancel_rx));
130129

131-
// Run the alerts scheduler
132-
tokio::spawn(async {
133-
match ALERTS.load().await {
134-
Ok(_) => info!("Alerts loaded successfully"),
135-
Err(e) => {
136-
error!("Failed to load alerts: {}", e);
137-
// return Err(e);
138-
}
139-
};
140-
});
141-
142130
tokio::spawn(airplane::server());
143131

144132
let result = self

src/sync.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -246,16 +246,25 @@ pub async fn alert_runtime(mut rx: mpsc::Receiver<AlertTask>) -> Result<(), anyh
246246
let alert = alert.clone();
247247
let id = alert.id;
248248
let handle = tokio::spawn(async move {
249+
let mut retry_counter = 0;
250+
let mut sleep_duration = alert.get_eval_frequency();
249251
loop {
250252
match alerts_utils::evaluate_alert(&alert).await {
251-
Ok(_) => {}
253+
Ok(_) => {
254+
retry_counter = 0;
255+
}
252256
Err(err) => {
253-
error!("Error while evaluation- {err}");
254-
break;
257+
warn!("Error while evaluation- {}\nRetrying after sleeping for 1 minute", err);
258+
sleep_duration = 1;
259+
retry_counter += 1;
260+
261+
if retry_counter > 3 {
262+
error!("Alert with id {} failed to evaluate after 3 retries with err- {}", id, err);
263+
break;
264+
}
255265
}
256266
}
257-
tokio::time::sleep(Duration::from_secs(alert.get_eval_frequency() * 60))
258-
.await;
267+
tokio::time::sleep(Duration::from_secs(sleep_duration * 60)).await;
259268
}
260269
});
261270

0 commit comments

Comments
 (0)