Skip to content

update: Prism home changes #1371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 84 additions & 36 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,53 +1031,101 @@ impl Alerts {
}

#[derive(Debug, Serialize)]
pub struct AlertsInfo {
pub struct AlertsSummary {
total: u64,
silenced: u64,
resolved: u64,
triggered: u64,
low: u64,
medium: u64,
high: u64,
critical: u64,
triggered: AlertsInfoByState,
silenced: AlertsInfoByState,
resolved: AlertsInfoByState,
}

#[derive(Debug, Serialize)]
pub struct AlertsInfoByState {
total: u64,
alert_info: Vec<AlertsInfo>,
}

#[derive(Debug, Serialize)]
pub struct AlertsInfo {
title: String,
id: Ulid,
severity: Severity,
}

// TODO: add RBAC
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
pub async fn get_alerts_summary() -> Result<AlertsSummary, AlertError> {
let alerts = ALERTS.alerts.read().await;
let mut total = 0;
let mut silenced = 0;
let mut resolved = 0;
let total = alerts.len() as u64;
let mut triggered = 0;
let mut low = 0;
let mut medium = 0;
let mut high = 0;
let mut critical = 0;
let mut resolved = 0;
let mut silenced = 0;
let mut triggered_alerts: Vec<AlertsInfo> = Vec::new();
let mut silenced_alerts: Vec<AlertsInfo> = Vec::new();
let mut resolved_alerts: Vec<AlertsInfo> = Vec::new();

// find total alerts for each state
// get title, id and state of each alert for that state
for (_, alert) in alerts.iter() {
total += 1;
match alert.state {
AlertState::Silenced => silenced += 1,
AlertState::Resolved => resolved += 1,
AlertState::Triggered => triggered += 1,
}

match alert.severity {
Severity::Low => low += 1,
Severity::Medium => medium += 1,
Severity::High => high += 1,
Severity::Critical => critical += 1,
AlertState::Triggered => {
triggered += 1;
triggered_alerts.push(AlertsInfo {
title: alert.title.clone(),
id: alert.id,
severity: alert.severity.clone(),
});
}
AlertState::Silenced => {
silenced += 1;
silenced_alerts.push(AlertsInfo {
title: alert.title.clone(),
id: alert.id,
severity: alert.severity.clone(),
});
}
AlertState::Resolved => {
resolved += 1;
resolved_alerts.push(AlertsInfo {
title: alert.title.clone(),
id: alert.id,
severity: alert.severity.clone(),
});
}
}
}

Ok(AlertsInfo {
// Sort and limit to top 5 for each state by severity priority
triggered_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity));
triggered_alerts.truncate(5);

silenced_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity));
silenced_alerts.truncate(5);

resolved_alerts.sort_by_key(|alert| get_severity_priority(&alert.severity));
resolved_alerts.truncate(5);

let alert_summary = AlertsSummary {
total,
silenced,
resolved,
triggered,
low,
medium,
high,
critical,
})
triggered: AlertsInfoByState {
total: triggered,
alert_info: triggered_alerts,
},
silenced: AlertsInfoByState {
total: silenced,
alert_info: silenced_alerts,
},
resolved: AlertsInfoByState {
total: resolved,
alert_info: resolved_alerts,
},
};
Ok(alert_summary)
}

fn get_severity_priority(severity: &Severity) -> u8 {
match severity {
Severity::Critical => 0,
Severity::High => 1,
Severity::Medium => 2,
Severity::Low => 3,
}
}
16 changes: 14 additions & 2 deletions src/handlers/http/users/dashboards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,20 @@ use actix_web::{
use http::StatusCode;
use serde_json::Error as SerdeError;

pub async fn list_dashboards() -> Result<impl Responder, DashboardError> {
let dashboards = DASHBOARDS.list_dashboards().await;
pub async fn list_dashboards(req: HttpRequest) -> Result<impl Responder, DashboardError> {
let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string())
.map_err(|_| DashboardError::InvalidQueryParameter)?;
let mut dashboard_limit = 0;
if !query_map.is_empty() {
if let Some(limit) = query_map.get("limit") {
if let Ok(parsed_limit) = limit.parse::<usize>() {
dashboard_limit = parsed_limit;
} else {
return Err(DashboardError::Metadata("Invalid limit value"));
}
}
}
let dashboards = DASHBOARDS.list_dashboards(dashboard_limit).await;
let dashboard_summaries = dashboards
.iter()
.map(|dashboard| dashboard.to_summary())
Expand Down
59 changes: 47 additions & 12 deletions src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ use serde::Serialize;
use tracing::error;

use crate::{
alerts::{get_alerts_info, AlertError, AlertsInfo, ALERTS},
alerts::{get_alerts_summary, AlertError, AlertsSummary, ALERTS},
correlation::{CorrelationError, CORRELATIONS},
event::format::LogSource,
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
parseable::PARSEABLE,
rbac::{map::SessionKey, role::Action, Users},
stats::Stats,
storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
users::{dashboards::DASHBOARDS, filters::FILTERS},
};
Expand All @@ -43,8 +44,8 @@ type StreamMetadataResponse = Result<(String, Vec<ObjectStoreFormat>, DataSetTyp
pub struct DatedStats {
date: String,
events: u64,
ingestion_size: u64,
storage_size: u64,
ingestion: u64,
storage: u64,
}

#[derive(Debug, Serialize)]
Expand All @@ -62,9 +63,10 @@ pub struct DataSet {

#[derive(Debug, Serialize)]
pub struct HomeResponse {
pub alerts_info: AlertsInfo,
pub alerts_summary: AlertsSummary,
pub stats_details: Vec<DatedStats>,
pub datasets: Vec<DataSet>,
pub top_five_ingestion: HashMap<String, Stats>,
}

#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -93,11 +95,11 @@ pub async fn generate_home_response(
include_internal: bool,
) -> Result<HomeResponse, PrismHomeError> {
// Execute these operations concurrently
let (stream_titles_result, alerts_info_result) =
tokio::join!(get_stream_titles(key), get_alerts_info());
let (stream_titles_result, alerts_summary_result) =
tokio::join!(get_stream_titles(key), get_alerts_summary());

let stream_titles = stream_titles_result?;
let alerts_info = alerts_info_result?;
let alerts_summary = alerts_summary_result?;

// Generate dates for date-wise stats
let mut dates = (0..7)
Expand All @@ -117,7 +119,7 @@ pub async fn generate_home_response(
let stream_metadata_results: Vec<StreamMetadataResponse> =
futures::future::join_all(stream_metadata_futures).await;

let mut stream_wise_stream_json = HashMap::new();
let mut stream_wise_stream_json: HashMap<String, Vec<ObjectStoreFormat>> = HashMap::new();
let mut datasets = Vec::new();

for result in stream_metadata_results {
Expand All @@ -144,6 +146,8 @@ pub async fn generate_home_response(
}
}

let top_five_ingestion = get_top_5_streams_by_ingestion(&stream_wise_stream_json);

// Process stats for all dates concurrently
let stats_futures = dates
.iter()
Expand All @@ -168,9 +172,40 @@ pub async fn generate_home_response(
Ok(HomeResponse {
stats_details: stream_details,
datasets,
alerts_info,
alerts_summary,
top_five_ingestion,
})
}

fn get_top_5_streams_by_ingestion(
stream_wise_stream_json: &HashMap<String, Vec<ObjectStoreFormat>>,
) -> HashMap<String, Stats> {
let mut result: Vec<_> = stream_wise_stream_json
.iter()
.map(|(stream_name, formats)| {
let total_stats = formats.iter().fold(
Stats {
events: 0,
ingestion: 0,
storage: 0,
},
|mut acc, osf| {
let current = &osf.stats.current_stats;
acc.events += current.events;
acc.ingestion += current.ingestion;
acc.storage += current.storage;
acc
},
);
(stream_name.clone(), total_stats)
})
.collect();

result.sort_by_key(|(_, stats)| std::cmp::Reverse(stats.ingestion));
result.truncate(5);
result.into_iter().collect()
}

async fn get_stream_metadata(
stream: String,
) -> Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError> {
Expand Down Expand Up @@ -240,8 +275,8 @@ async fn stats_for_date(
match result {
Ok((events, ingestion, storage)) => {
details.events += events;
details.ingestion_size += ingestion;
details.storage_size += storage;
details.ingestion += ingestion;
details.storage += storage;
}
Err(e) => {
error!("Failed to get stats for stream: {:?}", e);
Expand Down Expand Up @@ -370,7 +405,7 @@ async fn get_correlation_titles(

async fn get_dashboard_titles(query_value: &str) -> Result<Vec<Resource>, PrismHomeError> {
let dashboard_titles = DASHBOARDS
.list_dashboards()
.list_dashboards(0)
.await
.iter()
.filter_map(|dashboard| {
Expand Down
16 changes: 14 additions & 2 deletions src/users/dashboards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,20 @@ impl Dashboards {

/// List all dashboards
/// fetch all dashboards from memory
pub async fn list_dashboards(&self) -> Vec<Dashboard> {
self.0.read().await.clone()
pub async fn list_dashboards(&self, limit: usize) -> Vec<Dashboard> {
// limit the number of dashboards returned in order of modified date
// if limit is 0, return all dashboards
let dashboards = self.0.read().await;
let mut sorted_dashboards = dashboards
.iter()
.filter(|d| d.dashboard_id.is_some())
.cloned()
.collect::<Vec<Dashboard>>();
sorted_dashboards.sort_by_key(|d| std::cmp::Reverse(d.modified));
if limit > 0 {
sorted_dashboards.truncate(limit);
}
sorted_dashboards
}

/// List tags from all dashboards
Expand Down
Loading