Skip to content

Commit 34b149b

Browse files
update: add top 5 ingested streams in home api
1 parent c4533be commit 34b149b

File tree

1 file changed

+44
-1
lines changed

1 file changed

+44
-1
lines changed

src/prism/home/mod.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,21 @@ use crate::{
3333
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
3434
parseable::PARSEABLE,
3535
rbac::{map::SessionKey, role::Action, Users},
36+
stats::Stats,
3637
storage::{ObjectStorageError, ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
3738
users::{dashboards::DASHBOARDS, filters::FILTERS},
3839
};
3940

4041
type StreamMetadataResponse = Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError>;
4142

43+
#[derive(Debug, Serialize, Default)]
44+
pub struct DatasetStats {
45+
dataset_name: String,
46+
events: u64,
47+
ingestion_size: u64,
48+
storage_size: u64,
49+
}
50+
4251
#[derive(Debug, Serialize, Default)]
4352
pub struct DatedStats {
4453
date: String,
@@ -65,6 +74,7 @@ pub struct HomeResponse {
6574
pub alerts_info: AlertsInfo,
6675
pub stats_details: Vec<DatedStats>,
6776
pub datasets: Vec<DataSet>,
77+
pub top_five_ingestion: Vec<(String, Stats)>,
6878
}
6979

7080
#[derive(Debug, Serialize)]
@@ -117,7 +127,7 @@ pub async fn generate_home_response(
117127
let stream_metadata_results: Vec<StreamMetadataResponse> =
118128
futures::future::join_all(stream_metadata_futures).await;
119129

120-
let mut stream_wise_stream_json = HashMap::new();
130+
let mut stream_wise_stream_json: HashMap<String, Vec<ObjectStoreFormat>> = HashMap::new();
121131
let mut datasets = Vec::new();
122132

123133
for result in stream_metadata_results {
@@ -144,6 +154,8 @@ pub async fn generate_home_response(
144154
}
145155
}
146156

157+
let top_five_ingestion = get_top_5_streams_by_ingestion(&stream_wise_stream_json);
158+
147159
// Process stats for all dates concurrently
148160
let stats_futures = dates
149161
.iter()
@@ -169,8 +181,39 @@ pub async fn generate_home_response(
169181
stats_details: stream_details,
170182
datasets,
171183
alerts_info,
184+
top_five_ingestion,
172185
})
173186
}
187+
188+
fn get_top_5_streams_by_ingestion(
189+
stream_wise_stream_json: &HashMap<String, Vec<ObjectStoreFormat>>,
190+
) -> Vec<(String, Stats)> {
191+
let mut result: Vec<_> = stream_wise_stream_json
192+
.iter()
193+
.map(|(stream_name, formats)| {
194+
let total_stats = formats.iter().fold(
195+
Stats {
196+
events: 0,
197+
ingestion: 0,
198+
storage: 0,
199+
},
200+
|mut acc, osf| {
201+
let current = &osf.stats.current_stats;
202+
acc.events += current.events;
203+
acc.ingestion += current.ingestion;
204+
acc.storage += current.storage;
205+
acc
206+
},
207+
);
208+
(stream_name.clone(), total_stats)
209+
})
210+
.collect();
211+
212+
result.sort_by_key(|(_, stats)| std::cmp::Reverse(stats.ingestion));
213+
result.truncate(5);
214+
result
215+
}
216+
174217
async fn get_stream_metadata(
175218
stream: String,
176219
) -> Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError> {

0 commit comments

Comments
 (0)