Skip to content

updates for prism home api #1275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/correlation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ impl Correlations {
let store = PARSEABLE.storage.get_object_store();
let all_correlations = store.get_all_correlations().await.unwrap_or_default();

let mut guard = self.write().await;

for correlations_bytes in all_correlations.values().flatten() {
let correlation = match serde_json::from_slice::<CorrelationConfig>(correlations_bytes)
{
Expand All @@ -66,9 +68,7 @@ impl Correlations {
}
};

self.write()
.await
.insert(correlation.id.to_owned(), correlation);
guard.insert(correlation.id.to_owned(), correlation);
}

Ok(())
Expand Down
294 changes: 199 additions & 95 deletions src/prism/home/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use crate::{
users::{dashboards::DASHBOARDS, filters::FILTERS},
};

type StreamMetadataResponse = Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError>;

#[derive(Debug, Serialize, Default)]
struct StreamInfo {
// stream_count: u32,
Expand Down Expand Up @@ -89,7 +91,109 @@ pub struct HomeResponse {
}

pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, PrismHomeError> {
// get all stream titles
// Execute these operations concurrently
let (
stream_titles_result,
alert_titles_result,
correlation_titles_result,
dashboards_result,
filters_result,
alerts_info_result,
) = tokio::join!(
get_stream_titles(key),
get_alert_titles(key),
get_correlation_titles(key),
get_dashboard_titles(key),
get_filter_titles(key),
get_alerts_info()
);

let stream_titles = stream_titles_result?;
let alert_titles = alert_titles_result?;
let correlation_titles = correlation_titles_result?;
let dashboard_titles = dashboards_result?;
let filter_titles = filters_result?;
let alerts_info = alerts_info_result?;

// Generate dates for date-wise stats
let mut dates = (0..7)
.map(|i| {
Utc::now()
.checked_sub_signed(chrono::Duration::days(i))
.ok_or_else(|| anyhow::Error::msg("Date conversion failed"))
.unwrap()
})
.map(|date| date.format("%Y-%m-%d").to_string())
.collect_vec();
dates.reverse();

// Process stream metadata concurrently
let stream_metadata_futures = stream_titles
.iter()
.map(|stream| get_stream_metadata(stream.clone()));
let stream_metadata_results: Vec<StreamMetadataResponse> =
futures::future::join_all(stream_metadata_futures).await;

let mut stream_wise_stream_json = HashMap::new();
let mut datasets = Vec::new();

for result in stream_metadata_results {
match result {
Ok((stream, metadata, dataset_type)) => {
stream_wise_stream_json.insert(stream.clone(), metadata);
datasets.push(DataSet {
title: stream,
dataset_type,
});
}
Err(e) => {
error!("Failed to process stream metadata: {:?}", e);
// Continue with other streams instead of failing entirely
}
}
}

// Process stats for all dates concurrently
let stats_futures = dates
.iter()
.map(|date| stats_for_date(date.clone(), stream_wise_stream_json.clone()));
let stats_results: Vec<Result<DatedStats, PrismHomeError>> =
futures::future::join_all(stats_futures).await;

let mut stream_details = Vec::new();
let mut summary = StreamInfo::default();

for result in stats_results {
match result {
Ok(dated_stats) => {
summary.stats_summary.events += dated_stats.events;
summary.stats_summary.ingestion += dated_stats.ingestion_size;
summary.stats_summary.storage += dated_stats.storage_size;
stream_details.push(dated_stats);
}
Err(e) => {
error!("Failed to process stats for date: {:?}", e);
// Continue with other dates instead of failing entirely
}
}
}

Ok(HomeResponse {
stream_info: summary,
stats_details: stream_details,
stream_titles,
datasets,
alert_titles,
correlation_titles,
dashboard_titles,
filter_titles,
alerts_info,
})
}

// Helper functions to split the work

async fn get_stream_titles(key: &SessionKey) -> Result<Vec<String>, PrismHomeError> {
let stream_titles: Vec<String> = PARSEABLE
.storage
.get_object_store()
Expand All @@ -104,8 +208,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
.sorted()
.collect_vec();

// get all alert IDs (TODO: RBAC)
// do we need to move alerts into the PARSEABLE struct?
Ok(stream_titles)
}

async fn get_alert_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let alert_titles = ALERTS
.list_alerts_for_user(key.clone())
.await?
Expand All @@ -116,7 +222,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
})
.collect_vec();

// get correlation IDs
Ok(alert_titles)
}

async fn get_correlation_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let correlation_titles = CORRELATIONS
.list_correlations(key)
.await?
Expand All @@ -127,7 +236,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
})
.collect_vec();

// get dashboard IDs
Ok(correlation_titles)
}

async fn get_dashboard_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let dashboard_titles = DASHBOARDS
.list_dashboards(key)
.await
Expand All @@ -143,7 +255,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
})
.collect_vec();

// get filter IDs
Ok(dashboard_titles)
}

async fn get_filter_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
let filter_titles = FILTERS
.list_filters(key)
.await
Expand All @@ -159,117 +274,106 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
})
.collect_vec();

// get alerts info (distribution of alerts based on severity and state)
let alerts_info = get_alerts_info().await?;

// generate dates for date-wise stats
let mut dates = (0..7)
.map(|i| {
Utc::now()
.checked_sub_signed(chrono::Duration::days(i))
.ok_or_else(|| anyhow::Error::msg("Date conversion faield"))
.unwrap()
})
.map(|date| date.format("%Y-%m-%d").to_string())
.collect_vec();
dates.reverse();

let mut stream_details = Vec::new();
let mut datasets = Vec::new();
// this will hold the summary of all streams for the last 7 days
let mut summary = StreamInfo::default();

let mut stream_wise_stream_json = HashMap::new();
for stream in stream_titles.clone() {
let path = RelativePathBuf::from_iter([&stream, STREAM_ROOT_DIRECTORY]);
let obs = PARSEABLE
.storage
.get_object_store()
.get_objects(
Some(&path),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await?;

let mut stream_jsons = Vec::new();
for ob in obs {
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
Ok(d) => d,
Err(e) => {
error!("Failed to parse stream metadata: {:?}", e);
continue;
}
};
stream_jsons.push(stream_metadata);
}
stream_wise_stream_json.insert(stream.clone(), stream_jsons.clone());

let log_source = PARSEABLE
.get_stream(&stream)
.map_err(|e| PrismHomeError::Anyhow(e.into()))?
.get_log_source();

// if log_source_format is otel-metrics, set DataSetType to metrics
//if log_source_format is otel-traces, set DataSetType to traces
//else set DataSetType to logs

let dataset_type = match log_source[0].log_source_format {
LogSource::OtelMetrics => DataSetType::Metrics,
LogSource::OtelTraces => DataSetType::Traces,
_ => DataSetType::Logs,
};
Ok(filter_titles)
}

let dataset = DataSet {
title: stream.clone(),
dataset_type,
async fn get_stream_metadata(
stream: String,
) -> Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError> {
let path = RelativePathBuf::from_iter([&stream, STREAM_ROOT_DIRECTORY]);
let obs = PARSEABLE
.storage
.get_object_store()
.get_objects(
Some(&path),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await?;

let mut stream_jsons = Vec::new();
for ob in obs {
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
Ok(d) => d,
Err(e) => {
error!("Failed to parse stream metadata: {:?}", e);
continue;
}
};
datasets.push(dataset);
stream_jsons.push(stream_metadata);
}

for date in dates.into_iter() {
let dated_stats = stats_for_date(date, stream_wise_stream_json.clone()).await?;
summary.stats_summary.events += dated_stats.events;
summary.stats_summary.ingestion += dated_stats.ingestion_size;
summary.stats_summary.storage += dated_stats.storage_size;

stream_details.push(dated_stats);
if stream_jsons.is_empty() {
return Err(PrismHomeError::Anyhow(anyhow::Error::msg(
"No stream metadata found",
)));
}

Ok(HomeResponse {
stream_info: summary,
stats_details: stream_details,
stream_titles,
datasets,
alert_titles,
correlation_titles,
dashboard_titles,
filter_titles,
alerts_info,
})
// let log_source = &stream_jsons[0].clone().log_source;
let log_source_format = stream_jsons
.iter()
.find(|sj| !sj.log_source.is_empty())
.map(|sj| sj.log_source[0].log_source_format.clone())
.unwrap_or_default();

let dataset_type = match log_source_format {
LogSource::OtelMetrics => DataSetType::Metrics,
LogSource::OtelTraces => DataSetType::Traces,
_ => DataSetType::Logs,
};

Ok((stream, stream_jsons, dataset_type))
}

async fn stats_for_date(
date: String,
stream_wise_meta: HashMap<String, Vec<ObjectStoreFormat>>,
) -> Result<DatedStats, PrismHomeError> {
// collect stats for all the streams for the given date
// Initialize result structure
let mut details = DatedStats {
date: date.clone(),
..Default::default()
};

for (stream, meta) in stream_wise_meta {
let querier_stats = get_stats_date(&stream, &date).await?;
let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?;
// collect date-wise stats for all streams
details.events += querier_stats.events + ingestor_stats.events;
details.ingestion_size += querier_stats.ingestion + ingestor_stats.ingestion;
details.storage_size += querier_stats.storage + ingestor_stats.storage;
// Process each stream concurrently
let stream_stats_futures = stream_wise_meta.iter().map(|(stream, meta)| {
get_stream_stats_for_date(stream.clone(), date.clone(), meta.clone())
});

let stream_stats_results = futures::future::join_all(stream_stats_futures).await;

// Aggregate results
for result in stream_stats_results {
match result {
Ok((events, ingestion, storage)) => {
details.events += events;
details.ingestion_size += ingestion;
details.storage_size += storage;
}
Err(e) => {
error!("Failed to get stats for stream: {:?}", e);
// Continue with other streams
}
}
}

Ok(details)
}

async fn get_stream_stats_for_date(
stream: String,
date: String,
meta: Vec<ObjectStoreFormat>,
) -> Result<(u64, u64, u64), PrismHomeError> {
let querier_stats = get_stats_date(&stream, &date).await?;
let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?;

Ok((
querier_stats.events + ingestor_stats.events,
querier_stats.ingestion + ingestor_stats.ingestion,
querier_stats.storage + ingestor_stats.storage,
))
}

#[derive(Debug, thiserror::Error)]
pub enum PrismHomeError {
#[error("Error: {0}")]
Expand Down
Loading
Loading