Skip to content

Commit df01442

Browse files
authored
Merge branch 'main' into feat/gcs-support-for-storage
2 parents b93ae87 + 2bd8f2f commit df01442

File tree

4 files changed

+159
-52
lines changed

4 files changed

+159
-52
lines changed

src/alerts/mod.rs

Lines changed: 84 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,53 +1031,101 @@ impl Alerts {
10311031
}
10321032

10331033
#[derive(Debug, Serialize)]
1034-
pub struct AlertsInfo {
1034+
pub struct AlertsSummary {
10351035
total: u64,
1036-
silenced: u64,
1037-
resolved: u64,
1038-
triggered: u64,
1039-
low: u64,
1040-
medium: u64,
1041-
high: u64,
1042-
critical: u64,
1036+
triggered: AlertsInfoByState,
1037+
silenced: AlertsInfoByState,
1038+
resolved: AlertsInfoByState,
1039+
}
1040+
1041+
#[derive(Debug, Serialize)]
1042+
pub struct AlertsInfoByState {
1043+
total: u64,
1044+
alert_info: Vec<AlertsInfo>,
1045+
}
1046+
1047+
#[derive(Debug, Serialize)]
1048+
pub struct AlertsInfo {
1049+
title: String,
1050+
id: Ulid,
1051+
severity: Severity,
10431052
}
10441053

10451054
// TODO: add RBAC
1046-
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
1055+
pub async fn get_alerts_summary() -> Result<AlertsSummary, AlertError> {
10471056
let alerts = ALERTS.alerts.read().await;
1048-
let mut total = 0;
1049-
let mut silenced = 0;
1050-
let mut resolved = 0;
1057+
let total = alerts.len() as u64;
10511058
let mut triggered = 0;
1052-
let mut low = 0;
1053-
let mut medium = 0;
1054-
let mut high = 0;
1055-
let mut critical = 0;
1059+
let mut resolved = 0;
1060+
let mut silenced = 0;
1061+
let mut triggered_alerts: Vec<AlertsInfo> = Vec::new();
1062+
let mut silenced_alerts: Vec<AlertsInfo> = Vec::new();
1063+
let mut resolved_alerts: Vec<AlertsInfo> = Vec::new();
10561064

1065+
// find total alerts for each state
1066+
// get title, id and state of each alert for that state
10571067
for (_, alert) in alerts.iter() {
1058-
total += 1;
10591068
match alert.state {
1060-
AlertState::Silenced => silenced += 1,
1061-
AlertState::Resolved => resolved += 1,
1062-
AlertState::Triggered => triggered += 1,
1063-
}
1064-
1065-
match alert.severity {
1066-
Severity::Low => low += 1,
1067-
Severity::Medium => medium += 1,
1068-
Severity::High => high += 1,
1069-
Severity::Critical => critical += 1,
1069+
AlertState::Triggered => {
1070+
triggered += 1;
1071+
triggered_alerts.push(AlertsInfo {
1072+
title: alert.title.clone(),
1073+
id: alert.id,
1074+
severity: alert.severity.clone(),
1075+
});
1076+
}
1077+
AlertState::Silenced => {
1078+
silenced += 1;
1079+
silenced_alerts.push(AlertsInfo {
1080+
title: alert.title.clone(),
1081+
id: alert.id,
1082+
severity: alert.severity.clone(),
1083+
});
1084+
}
1085+
AlertState::Resolved => {
1086+
resolved += 1;
1087+
resolved_alerts.push(AlertsInfo {
1088+
title: alert.title.clone(),
1089+
id: alert.id,
1090+
severity: alert.severity.clone(),
1091+
});
1092+
}
10701093
}
10711094
}
10721095

1073-
Ok(AlertsInfo {
1096+
// Sort and limit to top 5 for each state by severity priority
1097+
triggered_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity));
1098+
triggered_alerts.truncate(5);
1099+
1100+
silenced_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity));
1101+
silenced_alerts.truncate(5);
1102+
1103+
resolved_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity));
1104+
resolved_alerts.truncate(5);
1105+
1106+
let alert_summary = AlertsSummary {
10741107
total,
1075-
silenced,
1076-
resolved,
1077-
triggered,
1078-
low,
1079-
medium,
1080-
high,
1081-
critical,
1082-
})
1108+
triggered: AlertsInfoByState {
1109+
total: triggered,
1110+
alert_info: triggered_alerts,
1111+
},
1112+
silenced: AlertsInfoByState {
1113+
total: silenced,
1114+
alert_info: silenced_alerts,
1115+
},
1116+
resolved: AlertsInfoByState {
1117+
total: resolved,
1118+
alert_info: resolved_alerts,
1119+
},
1120+
};
1121+
Ok(alert_summary)
1122+
}
1123+
1124+
fn get_severity_priority(severity: &Severity) -> u8 {
1125+
match severity {
1126+
Severity::Critical => 0,
1127+
Severity::High => 1,
1128+
Severity::Medium => 2,
1129+
Severity::Low => 3,
1130+
}
10831131
}

src/handlers/http/users/dashboards.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,20 @@ use actix_web::{
3232
use http::StatusCode;
3333
use serde_json::Error as SerdeError;
3434

35-
pub async fn list_dashboards() -> Result<impl Responder, DashboardError> {
36-
let dashboards = DASHBOARDS.list_dashboards().await;
35+
pub async fn list_dashboards(req: HttpRequest) -> Result<impl Responder, DashboardError> {
36+
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
37+
.map_err(|_| DashboardError::InvalidQueryParameter)?;
38+
let mut dashboard_limit = 0;
39+
if !query_map.is_empty() {
40+
if let Some(limit) = query_map.get("limit") {
41+
if let Ok(parsed_limit) = limit.parse::<usize>() {
42+
dashboard_limit = parsed_limit;
43+
} else {
44+
return Err(DashboardError::Metadata("Invalid limit value"));
45+
}
46+
}
47+
}
48+
let dashboards = DASHBOARDS.list_dashboards(dashboard_limit).await;
3749
let dashboard_summaries = dashboards
3850
.iter()
3951
.map(|dashboard| dashboard.to_summary())

src/prism/home/mod.rs

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ use serde::Serialize;
2727
use tracing::error;
2828

2929
use crate::{
30-
alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS},
30+
alerts::{get_alerts_summary, AlertError, AlertsSummary, ALERTS},
3131
correlation::{CorrelationError, CORRELATIONS},
3232
event::format::LogSource,
3333
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
3434
parseable::PARSEABLE,
3535
rbac::{map::SessionKey, role::Action, Users},
36+
stats::Stats,
3637
storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
3738
users::{dashboards::DASHBOARDS, filters::FILTERS},
3839
};
@@ -43,8 +44,8 @@ type StreamMetadataResponse = Result<(String, Vec<ObjectStoreFormat>, DataSetTyp
4344
pub struct DatedStats {
4445
date: String,
4546
events: u64,
46-
ingestion_size: u64,
47-
storage_size: u64,
47+
ingestion: u64,
48+
storage: u64,
4849
}
4950

5051
#[derive(Debug, Serialize)]
@@ -62,9 +63,10 @@ pub struct DataSet {
6263

6364
#[derive(Debug, Serialize)]
6465
pub struct HomeResponse {
65-
pub alerts_info: AlertsInfo,
66+
pub alerts_summary: AlertsSummary,
6667
pub stats_details: Vec<DatedStats>,
6768
pub datasets: Vec<DataSet>,
69+
pub top_five_ingestion: HashMap<String, Stats>,
6870
}
6971

7072
#[derive(Debug, Serialize)]
@@ -93,11 +95,11 @@ pub async fn generate_home_response(
9395
include_internal: bool,
9496
) -> Result<HomeResponse, PrismHomeError> {
9597
// Execute these operations concurrently
96-
let (stream_titles_result, alerts_info_result) =
97-
tokio::join!(get_stream_titles(key), get_alerts_info());
98+
let (stream_titles_result, alerts_summary_result) =
99+
tokio::join!(get_stream_titles(key), get_alerts_summary());
98100

99101
let stream_titles = stream_titles_result?;
100-
let alerts_info = alerts_info_result?;
102+
let alerts_summary = alerts_summary_result?;
101103

102104
// Generate dates for date-wise stats
103105
let mut dates = (0..7)
@@ -117,7 +119,7 @@ pub async fn generate_home_response(
117119
let stream_metadata_results: Vec<StreamMetadataResponse> =
118120
futures::future::join_all(stream_metadata_futures).await;
119121

120-
let mut stream_wise_stream_json = HashMap::new();
122+
let mut stream_wise_stream_json: HashMap<String, Vec<ObjectStoreFormat>> = HashMap::new();
121123
let mut datasets = Vec::new();
122124

123125
for result in stream_metadata_results {
@@ -144,6 +146,8 @@ pub async fn generate_home_response(
144146
}
145147
}
146148

149+
let top_five_ingestion = get_top_5_streams_by_ingestion(&stream_wise_stream_json);
150+
147151
// Process stats for all dates concurrently
148152
let stats_futures = dates
149153
.iter()
@@ -168,9 +172,40 @@ pub async fn generate_home_response(
168172
Ok(HomeResponse {
169173
stats_details: stream_details,
170174
datasets,
171-
alerts_info,
175+
alerts_summary,
176+
top_five_ingestion,
172177
})
173178
}
179+
180+
fn get_top_5_streams_by_ingestion(
181+
stream_wise_stream_json: &HashMap<String, Vec<ObjectStoreFormat>>,
182+
) -> HashMap<String, Stats> {
183+
let mut result: Vec<_> = stream_wise_stream_json
184+
.iter()
185+
.map(|(stream_name, formats)| {
186+
let total_stats = formats.iter().fold(
187+
Stats {
188+
events: 0,
189+
ingestion: 0,
190+
storage: 0,
191+
},
192+
|mut acc, osf| {
193+
let current = &osf.stats.current_stats;
194+
acc.events += current.events;
195+
acc.ingestion += current.ingestion;
196+
acc.storage += current.storage;
197+
acc
198+
},
199+
);
200+
(stream_name.clone(), total_stats)
201+
})
202+
.collect();
203+
204+
result.sort_by_key(|(_, stats)| std::cmp::Reverse(stats.ingestion));
205+
result.truncate(5);
206+
result.into_iter().collect()
207+
}
208+
174209
async fn get_stream_metadata(
175210
stream: String,
176211
) -> Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError> {
@@ -240,8 +275,8 @@ async fn stats_for_date(
240275
match result {
241276
Ok((events, ingestion, storage)) => {
242277
details.events += events;
243-
details.ingestion_size += ingestion;
244-
details.storage_size += storage;
278+
details.ingestion += ingestion;
279+
details.storage += storage;
245280
}
246281
Err(e) => {
247282
error!("Failed to get stats for stream: {:?}", e);
@@ -370,7 +405,7 @@ async fn get_correlation_titles(
370405

371406
async fn get_dashboard_titles(query_value: &str) -> Result<Vec<Resource>, PrismHomeError> {
372407
let dashboard_titles = DASHBOARDS
373-
.list_dashboards()
408+
.list_dashboards(0)
374409
.await
375410
.iter()
376411
.filter_map(|dashboard| {

src/users/dashboards.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,20 @@ impl Dashboards {
344344

345345
/// List all dashboards
346346
/// fetch all dashboards from memory
347-
pub async fn list_dashboards(&self) -> Vec<Dashboard> {
348-
self.0.read().await.clone()
347+
pub async fn list_dashboards(&self, limit: usize) -> Vec<Dashboard> {
348+
// limit the number of dashboards returned in order of modified date
349+
// if limit is 0, return all dashboards
350+
let dashboards = self.0.read().await;
351+
let mut sorted_dashboards = dashboards
352+
.iter()
353+
.filter(|d| d.dashboard_id.is_some())
354+
.cloned()
355+
.collect::<Vec<Dashboard>>();
356+
sorted_dashboards.sort_by_key(|d| std::cmp::Reverse(d.modified));
357+
if limit > 0 {
358+
sorted_dashboards.truncate(limit);
359+
}
360+
sorted_dashboards
349361
}
350362

351363
/// List tags from all dashboards

0 commit comments

Comments
 (0)