@@ -29,6 +29,7 @@ use tracing::error;
29
29
use crate :: {
30
30
alerts:: { get_alerts_info, AlertError , AlertsInfo , ALERTS } ,
31
31
correlation:: { CorrelationError , CORRELATIONS } ,
32
+ event:: format:: LogSource ,
32
33
handlers:: http:: {
33
34
cluster:: fetch_daily_stats_from_ingestors,
34
35
logstream:: { error:: StreamError , get_stats_date} ,
@@ -61,6 +62,19 @@ struct TitleAndId {
61
62
id : String ,
62
63
}
63
64
65
+ #[ derive( Debug , Serialize ) ]
66
+ enum DataSetType {
67
+ Logs ,
68
+ Metrics ,
69
+ Traces ,
70
+ }
71
+
72
+ #[ derive( Debug , Serialize ) ]
73
+ struct DataSet {
74
+ title : String ,
75
+ dataset_type : DataSetType ,
76
+ }
77
+
64
78
#[ derive( Debug , Serialize ) ]
65
79
pub struct HomeResponse {
66
80
alert_titles : Vec < TitleAndId > ,
@@ -69,7 +83,7 @@ pub struct HomeResponse {
69
83
stream_info : StreamInfo ,
70
84
stats_details : Vec < DatedStats > ,
71
85
stream_titles : Vec < String > ,
72
-
86
+ datasets : Vec < DataSet > ,
73
87
dashboard_titles : Vec < TitleAndId > ,
74
88
filter_titles : Vec < TitleAndId > ,
75
89
}
@@ -161,25 +175,23 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
161
175
dates. reverse ( ) ;
162
176
163
177
let mut stream_details = Vec :: new ( ) ;
164
-
178
+ let mut datasets = Vec :: new ( ) ;
165
179
// this will hold the summary of all streams for the last 7 days
166
180
let mut summary = StreamInfo :: default ( ) ;
167
181
168
- let mut stream_wise_ingestor_stream_json = HashMap :: new ( ) ;
182
+ let mut stream_wise_stream_json = HashMap :: new ( ) ;
169
183
for stream in stream_titles. clone ( ) {
170
184
let path = RelativePathBuf :: from_iter ( [ & stream, STREAM_ROOT_DIRECTORY ] ) ;
171
185
let obs = PARSEABLE
172
186
. storage
173
187
. get_object_store ( )
174
188
. get_objects (
175
189
Some ( & path) ,
176
- Box :: new ( |file_name| {
177
- file_name. starts_with ( ".ingestor" ) && file_name. ends_with ( "stream.json" )
178
- } ) ,
190
+ Box :: new ( |file_name| file_name. ends_with ( "stream.json" ) ) ,
179
191
)
180
192
. await ?;
181
193
182
- let mut ingestor_stream_jsons = Vec :: new ( ) ;
194
+ let mut stream_jsons = Vec :: new ( ) ;
183
195
for ob in obs {
184
196
let stream_metadata: ObjectStoreFormat = match serde_json:: from_slice ( & ob) {
185
197
Ok ( d) => d,
@@ -188,13 +200,31 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
188
200
continue ;
189
201
}
190
202
} ;
191
- ingestor_stream_jsons . push ( stream_metadata) ;
203
+ stream_jsons . push ( stream_metadata) ;
192
204
}
193
- stream_wise_ingestor_stream_json. insert ( stream, ingestor_stream_jsons) ;
205
+ stream_wise_stream_json. insert ( stream. clone ( ) , stream_jsons. clone ( ) ) ;
206
+
207
+ let log_source = & stream_jsons[ 0 ] . clone ( ) . log_source ;
208
+
209
+ // if log_source_format is otel-metrics, set DataSetType to metrics
210
+ //if log_source_format is otel-traces, set DataSetType to traces
211
+ //else set DataSetType to logs
212
+
213
+ let dataset_type = match log_source[ 0 ] . log_source_format {
214
+ LogSource :: OtelMetrics => DataSetType :: Metrics ,
215
+ LogSource :: OtelTraces => DataSetType :: Traces ,
216
+ _ => DataSetType :: Logs ,
217
+ } ;
218
+
219
+ let dataset = DataSet {
220
+ title : stream. clone ( ) ,
221
+ dataset_type,
222
+ } ;
223
+ datasets. push ( dataset) ;
194
224
}
195
225
196
226
for date in dates. into_iter ( ) {
197
- let dated_stats = stats_for_date ( date, stream_wise_ingestor_stream_json . clone ( ) ) . await ?;
227
+ let dated_stats = stats_for_date ( date, stream_wise_stream_json . clone ( ) ) . await ?;
198
228
summary. stats_summary . events += dated_stats. events ;
199
229
summary. stats_summary . ingestion += dated_stats. ingestion_size ;
200
230
summary. stats_summary . storage += dated_stats. storage_size ;
@@ -205,7 +235,8 @@ pub async fn generate_home_response(key: &SessionKey) -> Result<HomeResponse, Pr
205
235
Ok ( HomeResponse {
206
236
stream_info : summary,
207
237
stats_details : stream_details,
208
- stream_titles : stream_titles. clone ( ) ,
238
+ stream_titles,
239
+ datasets,
209
240
alert_titles,
210
241
correlation_titles,
211
242
dashboard_titles,
0 commit comments