Skip to content

Commit 5855693

Browse files
fix: parallelise datasets API
process each stream in parallel process distinct counts in parallel
1 parent 9cd3809 commit 5855693

File tree

1 file changed

+123
-65
lines changed

1 file changed

+123
-65
lines changed

src/prism/logstream/mod.rs

Lines changed: 123 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -246,79 +246,137 @@ impl PrismDatasetRequest {
246246
mut self,
247247
key: SessionKey,
248248
) -> Result<Vec<PrismDatasetResponse>, PrismLogstreamError> {
249-
let is_empty = self.streams.is_empty();
250-
if is_empty {
249+
if self.streams.is_empty() {
251250
self.streams = PARSEABLE.streams.list();
252251
}
253252

254-
let mut responses = vec![];
255-
for stream in self.streams.iter() {
256-
if Users.authorize(key.clone(), Action::ListStream, Some(stream), None)
257-
!= crate::rbac::Response::Authorized
258-
{
259-
// Don't warn if listed from Parseable
260-
if !is_empty {
261-
warn!("Unauthorized access requested for stream: {stream}");
262-
}
263-
continue;
253+
// Process streams concurrently
254+
let results = futures::future::join_all(
255+
self.streams
256+
.iter()
257+
.map(|stream| self.process_stream(stream.clone(), key.clone())),
258+
)
259+
.await;
260+
261+
// Collect successful responses and handle errors
262+
let mut responses = Vec::new();
263+
for result in results {
264+
match result {
265+
Some(Ok(response)) => responses.push(response),
266+
Some(Err(err)) => return Err(err),
267+
None => {} // Skip unauthorized or not found streams
264268
}
269+
}
265270

266-
if PARSEABLE.check_or_load_stream(stream).await {
267-
debug!("Stream not found: {stream}");
268-
continue;
269-
}
271+
Ok(responses)
272+
}
270273

271-
let PrismLogstreamInfo {
272-
info,
273-
schema,
274-
stats,
275-
retention,
276-
} = get_prism_logstream_info(stream).await?;
277-
278-
let hottier = match HotTierManager::global() {
279-
Some(manager) => match manager.get_hot_tier(stream).await {
280-
Ok(stats) => Some(stats),
281-
Err(HotTierError::HotTierValidationError(
282-
HotTierValidationError::NotFound(_),
283-
)) => None,
284-
Err(err) => return Err(err.into()),
285-
},
286-
_ => None,
287-
};
288-
let records = CountsRequest {
289-
stream: stream.clone(),
290-
start_time: "1h".to_owned(),
291-
end_time: "now".to_owned(),
292-
num_bins: 10,
293-
}
294-
.get_bin_density()
295-
.await?;
296-
let counts = CountsResponse {
297-
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
298-
records,
299-
};
300-
301-
// Retrieve distinct values for source identifiers
302-
// Returns None if fields aren't present or if query fails
303-
let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok();
304-
let user_agents = self.get_distinct_entries(stream, "p_user_agent").await.ok();
305-
306-
responses.push(PrismDatasetResponse {
307-
stream: stream.clone(),
308-
info,
309-
schema,
310-
stats,
311-
retention,
312-
hottier,
313-
counts,
314-
distinct_sources: json!({
315-
"ips": ips,
316-
"user_agents": user_agents
317-
}),
318-
})
274+
async fn process_stream(
275+
&self,
276+
stream: String,
277+
key: SessionKey,
278+
) -> Option<Result<PrismDatasetResponse, PrismLogstreamError>> {
279+
// Skip unauthorized streams
280+
if !self.is_authorized(&stream, &key) {
281+
return None;
319282
}
320283

321-
Ok(responses)
284+
// Skip streams that don't exist
285+
if !self.stream_exists(&stream).await {
286+
return None;
287+
}
288+
289+
// Process stream data
290+
match get_prism_logstream_info(&stream).await {
291+
Ok(info) => Some(self.build_dataset_response(stream, info).await),
292+
Err(err) => Some(Err(err)),
293+
}
294+
}
295+
296+
fn is_authorized(&self, stream: &str, key: &SessionKey) -> bool {
297+
if Users.authorize(key.clone(), Action::ListStream, Some(stream), None)
298+
!= crate::rbac::Response::Authorized
299+
{
300+
warn!("Unauthorized access requested for stream: {stream}");
301+
false
302+
} else {
303+
true
304+
}
305+
}
306+
307+
async fn stream_exists(&self, stream: &str) -> bool {
308+
if PARSEABLE.check_or_load_stream(stream).await {
309+
debug!("Stream not found: {stream}");
310+
false
311+
} else {
312+
true
313+
}
314+
}
315+
316+
async fn build_dataset_response(
317+
&self,
318+
stream: String,
319+
info: PrismLogstreamInfo,
320+
) -> Result<PrismDatasetResponse, PrismLogstreamError> {
321+
// Get hot tier info
322+
let hottier = self.get_hot_tier_info(&stream).await?;
323+
324+
// Get counts
325+
let counts = self.get_counts(&stream).await?;
326+
327+
// Get distinct entries concurrently
328+
let (ips_result, user_agents_result) = futures::join!(
329+
self.get_distinct_entries(&stream, "p_src_ip"),
330+
self.get_distinct_entries(&stream, "p_user_agent")
331+
);
332+
333+
let ips = ips_result.ok();
334+
let user_agents = user_agents_result.ok();
335+
336+
Ok(PrismDatasetResponse {
337+
stream,
338+
info: info.info,
339+
schema: info.schema,
340+
stats: info.stats,
341+
retention: info.retention,
342+
hottier,
343+
counts,
344+
distinct_sources: json!({
345+
"ips": ips,
346+
"user_agents": user_agents
347+
}),
348+
})
349+
}
350+
351+
async fn get_hot_tier_info(
352+
&self,
353+
stream: &str,
354+
) -> Result<Option<StreamHotTier>, PrismLogstreamError> {
355+
match HotTierManager::global() {
356+
Some(manager) => match manager.get_hot_tier(stream).await {
357+
Ok(stats) => Ok(Some(stats)),
358+
Err(HotTierError::HotTierValidationError(HotTierValidationError::NotFound(_))) => {
359+
Ok(None)
360+
}
361+
Err(err) => Err(err.into()),
362+
},
363+
None => Ok(None),
364+
}
365+
}
366+
367+
async fn get_counts(&self, stream: &str) -> Result<CountsResponse, PrismLogstreamError> {
368+
let count_request = CountsRequest {
369+
stream: stream.to_owned(),
370+
start_time: "1h".to_owned(),
371+
end_time: "now".to_owned(),
372+
num_bins: 10,
373+
};
374+
375+
let records = count_request.get_bin_density().await?;
376+
Ok(CountsResponse {
377+
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
378+
records,
379+
})
322380
}
323381

324382
/// Retrieves distinct values for a specific field in a stream.

0 commit comments

Comments
 (0)