Skip to content

Commit 7053832

Browse files
Merge branch 'main' into parallel-dataset
2 parents dc754d8 + 26e700d commit 7053832

File tree

3 files changed

+211
-106
lines changed

3 files changed

+211
-106
lines changed

src/correlation.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ impl Correlations {
5656
let store = PARSEABLE.storage.get_object_store();
5757
let all_correlations = store.get_all_correlations().await.unwrap_or_default();
5858

59+
let mut guard = self.write().await;
60+
5961
for correlations_bytes in all_correlations.values().flatten() {
6062
let correlation = match serde_json::from_slice::<CorrelationConfig>(correlations_bytes)
6163
{
@@ -66,9 +68,7 @@ impl Correlations {
6668
}
6769
};
6870

69-
self.write()
70-
.await
71-
.insert(correlation.id.to_owned(), correlation);
71+
guard.insert(correlation.id.to_owned(), correlation);
7272
}
7373

7474
Ok(())

src/prism/home/mod.rs

Lines changed: 199 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ use crate::{
4141
users::{dashboards::DASHBOARDS, filters::FILTERS},
4242
};
4343

44+
type StreamMetadataResponse = Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError>;
45+
4446
#[derive(Debug, Serialize, Default)]
4547
struct StreamInfo {
4648
// stream_count: u32,
@@ -89,7 +91,109 @@ pub struct HomeResponse {
8991
}
9092

9193
pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, PrismHomeError> {
92-
// get all stream titles
94+
// Execute these operations concurrently
95+
let (
96+
stream_titles_result,
97+
alert_titles_result,
98+
correlation_titles_result,
99+
dashboards_result,
100+
filters_result,
101+
alerts_info_result,
102+
) = tokio::join!(
103+
get_stream_titles(key),
104+
get_alert_titles(key),
105+
get_correlation_titles(key),
106+
get_dashboard_titles(key),
107+
get_filter_titles(key),
108+
get_alerts_info()
109+
);
110+
111+
let stream_titles = stream_titles_result?;
112+
let alert_titles = alert_titles_result?;
113+
let correlation_titles = correlation_titles_result?;
114+
let dashboard_titles = dashboards_result?;
115+
let filter_titles = filters_result?;
116+
let alerts_info = alerts_info_result?;
117+
118+
// Generate dates for date-wise stats
119+
let mut dates = (0..7)
120+
.map(|i| {
121+
Utc::now()
122+
.checked_sub_signed(chrono::Duration::days(i))
123+
.ok_or_else(|| anyhow::Error::msg("Date conversion failed"))
124+
.unwrap()
125+
})
126+
.map(|date| date.format("%Y-%m-%d").to_string())
127+
.collect_vec();
128+
dates.reverse();
129+
130+
// Process stream metadata concurrently
131+
let stream_metadata_futures = stream_titles
132+
.iter()
133+
.map(|stream| get_stream_metadata(stream.clone()));
134+
let stream_metadata_results: Vec<StreamMetadataResponse> =
135+
futures::future::join_all(stream_metadata_futures).await;
136+
137+
let mut stream_wise_stream_json = HashMap::new();
138+
let mut datasets = Vec::new();
139+
140+
for result in stream_metadata_results {
141+
match result {
142+
Ok((stream, metadata, dataset_type)) => {
143+
stream_wise_stream_json.insert(stream.clone(), metadata);
144+
datasets.push(DataSet {
145+
title: stream,
146+
dataset_type,
147+
});
148+
}
149+
Err(e) => {
150+
error!("Failed to process stream metadata: {:?}", e);
151+
// Continue with other streams instead of failing entirely
152+
}
153+
}
154+
}
155+
156+
// Process stats for all dates concurrently
157+
let stats_futures = dates
158+
.iter()
159+
.map(|date| stats_for_date(date.clone(), stream_wise_stream_json.clone()));
160+
let stats_results: Vec<Result<DatedStats, PrismHomeError>> =
161+
futures::future::join_all(stats_futures).await;
162+
163+
let mut stream_details = Vec::new();
164+
let mut summary = StreamInfo::default();
165+
166+
for result in stats_results {
167+
match result {
168+
Ok(dated_stats) => {
169+
summary.stats_summary.events += dated_stats.events;
170+
summary.stats_summary.ingestion += dated_stats.ingestion_size;
171+
summary.stats_summary.storage += dated_stats.storage_size;
172+
stream_details.push(dated_stats);
173+
}
174+
Err(e) => {
175+
error!("Failed to process stats for date: {:?}", e);
176+
// Continue with other dates instead of failing entirely
177+
}
178+
}
179+
}
180+
181+
Ok(HomeResponse {
182+
stream_info: summary,
183+
stats_details: stream_details,
184+
stream_titles,
185+
datasets,
186+
alert_titles,
187+
correlation_titles,
188+
dashboard_titles,
189+
filter_titles,
190+
alerts_info,
191+
})
192+
}
193+
194+
// Helper functions to split the work
195+
196+
async fn get_stream_titles(key: &SessionKey) -> Result<Vec<String>, PrismHomeError> {
93197
let stream_titles: Vec<String> = PARSEABLE
94198
.storage
95199
.get_object_store()
@@ -104,8 +208,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
104208
.sorted()
105209
.collect_vec();
106210

107-
// get all alert IDs (TODO: RBAC)
108-
// do we need to move alerts into the PARSEABLE struct?
211+
Ok(stream_titles)
212+
}
213+
214+
async fn get_alert_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
109215
let alert_titles = ALERTS
110216
.list_alerts_for_user(key.clone())
111217
.await?
@@ -116,7 +222,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
116222
})
117223
.collect_vec();
118224

119-
// get correlation IDs
225+
Ok(alert_titles)
226+
}
227+
228+
async fn get_correlation_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
120229
let correlation_titles = CORRELATIONS
121230
.list_correlations(key)
122231
.await?
@@ -127,7 +236,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
127236
})
128237
.collect_vec();
129238

130-
// get dashboard IDs
239+
Ok(correlation_titles)
240+
}
241+
242+
async fn get_dashboard_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
131243
let dashboard_titles = DASHBOARDS
132244
.list_dashboards(key)
133245
.await
@@ -143,7 +255,10 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
143255
})
144256
.collect_vec();
145257

146-
// get filter IDs
258+
Ok(dashboard_titles)
259+
}
260+
261+
async fn get_filter_titles(key: &SessionKey) -> Result<Vec<TitleAndId>, PrismHomeError> {
147262
let filter_titles = FILTERS
148263
.list_filters(key)
149264
.await
@@ -159,117 +274,106 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
159274
})
160275
.collect_vec();
161276

162-
// get alerts info (distribution of alerts based on severity and state)
163-
let alerts_info = get_alerts_info().await?;
164-
165-
// generate dates for date-wise stats
166-
let mut dates = (0..7)
167-
.map(|i| {
168-
Utc::now()
169-
.checked_sub_signed(chrono::Duration::days(i))
170-
.ok_or_else(|| anyhow::Error::msg("Date conversion faield"))
171-
.unwrap()
172-
})
173-
.map(|date| date.format("%Y-%m-%d").to_string())
174-
.collect_vec();
175-
dates.reverse();
176-
177-
let mut stream_details = Vec::new();
178-
let mut datasets = Vec::new();
179-
// this will hold the summary of all streams for the last 7 days
180-
let mut summary = StreamInfo::default();
181-
182-
let mut stream_wise_stream_json = HashMap::new();
183-
for stream in stream_titles.clone() {
184-
let path = RelativePathBuf::from_iter([&stream, STREAM_ROOT_DIRECTORY]);
185-
let obs = PARSEABLE
186-
.storage
187-
.get_object_store()
188-
.get_objects(
189-
Some(&path),
190-
Box::new(|file_name| file_name.ends_with("stream.json")),
191-
)
192-
.await?;
193-
194-
let mut stream_jsons = Vec::new();
195-
for ob in obs {
196-
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
197-
Ok(d) => d,
198-
Err(e) => {
199-
error!("Failed to parse stream metadata: {:?}", e);
200-
continue;
201-
}
202-
};
203-
stream_jsons.push(stream_metadata);
204-
}
205-
stream_wise_stream_json.insert(stream.clone(), stream_jsons.clone());
206-
207-
let log_source = PARSEABLE
208-
.get_stream(&stream)
209-
.map_err(|e| PrismHomeError::Anyhow(e.into()))?
210-
.get_log_source();
211-
212-
// if log_source_format is otel-metrics, set DataSetType to metrics
213-
//if log_source_format is otel-traces, set DataSetType to traces
214-
//else set DataSetType to logs
215-
216-
let dataset_type = match log_source[0].log_source_format {
217-
LogSource::OtelMetrics => DataSetType::Metrics,
218-
LogSource::OtelTraces => DataSetType::Traces,
219-
_ => DataSetType::Logs,
220-
};
277+
Ok(filter_titles)
278+
}
221279

222-
let dataset = DataSet {
223-
title: stream.clone(),
224-
dataset_type,
280+
async fn get_stream_metadata(
281+
stream: String,
282+
) -> Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError> {
283+
let path = RelativePathBuf::from_iter([&stream, STREAM_ROOT_DIRECTORY]);
284+
let obs = PARSEABLE
285+
.storage
286+
.get_object_store()
287+
.get_objects(
288+
Some(&path),
289+
Box::new(|file_name| file_name.ends_with("stream.json")),
290+
)
291+
.await?;
292+
293+
let mut stream_jsons = Vec::new();
294+
for ob in obs {
295+
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
296+
Ok(d) => d,
297+
Err(e) => {
298+
error!("Failed to parse stream metadata: {:?}", e);
299+
continue;
300+
}
225301
};
226-
datasets.push(dataset);
302+
stream_jsons.push(stream_metadata);
227303
}
228304

229-
for date in dates.into_iter() {
230-
let dated_stats = stats_for_date(date, stream_wise_stream_json.clone()).await?;
231-
summary.stats_summary.events += dated_stats.events;
232-
summary.stats_summary.ingestion += dated_stats.ingestion_size;
233-
summary.stats_summary.storage += dated_stats.storage_size;
234-
235-
stream_details.push(dated_stats);
305+
if stream_jsons.is_empty() {
306+
return Err(PrismHomeError::Anyhow(anyhow::Error::msg(
307+
"No stream metadata found",
308+
)));
236309
}
237310

238-
Ok(HomeResponse {
239-
stream_info: summary,
240-
stats_details: stream_details,
241-
stream_titles,
242-
datasets,
243-
alert_titles,
244-
correlation_titles,
245-
dashboard_titles,
246-
filter_titles,
247-
alerts_info,
248-
})
311+
// let log_source = &stream_jsons[0].clone().log_source;
312+
let log_source_format = stream_jsons
313+
.iter()
314+
.find(|sj| !sj.log_source.is_empty())
315+
.map(|sj| sj.log_source[0].log_source_format.clone())
316+
.unwrap_or_default();
317+
318+
let dataset_type = match log_source_format {
319+
LogSource::OtelMetrics => DataSetType::Metrics,
320+
LogSource::OtelTraces => DataSetType::Traces,
321+
_ => DataSetType::Logs,
322+
};
323+
324+
Ok((stream, stream_jsons, dataset_type))
249325
}
250326

251327
async fn stats_for_date(
252328
date: String,
253329
stream_wise_meta: HashMap<String, Vec<ObjectStoreFormat>>,
254330
) -> Result<DatedStats, PrismHomeError> {
255-
// collect stats for all the streams for the given date
331+
// Initialize result structure
256332
let mut details = DatedStats {
257333
date: date.clone(),
258334
..Default::default()
259335
};
260336

261-
for (stream, meta) in stream_wise_meta {
262-
let querier_stats = get_stats_date(&stream, &date).await?;
263-
let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?;
264-
// collect date-wise stats for all streams
265-
details.events += querier_stats.events + ingestor_stats.events;
266-
details.ingestion_size += querier_stats.ingestion + ingestor_stats.ingestion;
267-
details.storage_size += querier_stats.storage + ingestor_stats.storage;
337+
// Process each stream concurrently
338+
let stream_stats_futures = stream_wise_meta.iter().map(|(stream, meta)| {
339+
get_stream_stats_for_date(stream.clone(), date.clone(), meta.clone())
340+
});
341+
342+
let stream_stats_results = futures::future::join_all(stream_stats_futures).await;
343+
344+
// Aggregate results
345+
for result in stream_stats_results {
346+
match result {
347+
Ok((events, ingestion, storage)) => {
348+
details.events += events;
349+
details.ingestion_size += ingestion;
350+
details.storage_size += storage;
351+
}
352+
Err(e) => {
353+
error!("Failed to get stats for stream: {:?}", e);
354+
// Continue with other streams
355+
}
356+
}
268357
}
269358

270359
Ok(details)
271360
}
272361

362+
async fn get_stream_stats_for_date(
363+
stream: String,
364+
date: String,
365+
meta: Vec<ObjectStoreFormat>,
366+
) -> Result<(u64, u64, u64), PrismHomeError> {
367+
let querier_stats = get_stats_date(&stream, &date).await?;
368+
let ingestor_stats = fetch_daily_stats_from_ingestors(&date, &meta)?;
369+
370+
Ok((
371+
querier_stats.events + ingestor_stats.events,
372+
querier_stats.ingestion + ingestor_stats.ingestion,
373+
querier_stats.storage + ingestor_stats.storage,
374+
))
375+
}
376+
273377
#[derive(Debug, thiserror::Error)]
274378
pub enum PrismHomeError {
275379
#[error("Error: {0}")]

0 commit comments

Comments
 (0)