Skip to content

fix: parallelise datasets API #1276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 129 additions & 65 deletions src/prism/logstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use chrono::Utc;
use http::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tracing::{debug, error, warn};
use tracing::warn;

use crate::{
handlers::http::{
Expand Down Expand Up @@ -247,79 +247,143 @@ impl PrismDatasetRequest {
mut self,
key: SessionKey,
) -> Result<Vec<PrismDatasetResponse>, PrismLogstreamError> {
let is_empty = self.streams.is_empty();
if is_empty {
if self.streams.is_empty() {
self.streams = PARSEABLE.streams.list();
}

let mut responses = vec![];
for stream in self.streams.iter() {
if Users.authorize(key.clone(), Action::ListStream, Some(stream), None)
!= crate::rbac::Response::Authorized
{
// Don't warn if listed from Parseable
if !is_empty {
warn!("Unauthorized access requested for stream: {stream}");
// Process streams concurrently
let results = futures::future::join_all(
self.streams
.iter()
.map(|stream| self.process_stream(stream.clone(), key.clone())),
)
.await;

// Collect successful responses and handle errors
let mut responses = Vec::new();
for result in results {
match result {
Ok(Some(response)) => responses.push(response),
Ok(None) => {
warn!("Stream not found or unauthorized access");
continue;
}
Err(err) => {
warn!("error: {err}");
continue;
}
continue;
}
}

if PARSEABLE.check_or_load_stream(stream).await {
debug!("Stream not found: {stream}");
continue;
}
Ok(responses)
}

let PrismLogstreamInfo {
info,
schema,
stats,
retention,
} = get_prism_logstream_info(stream).await?;

let hottier = match HotTierManager::global() {
Some(manager) => match manager.get_hot_tier(stream).await {
Ok(stats) => Some(stats),
Err(HotTierError::HotTierValidationError(
HotTierValidationError::NotFound(_),
)) => None,
Err(err) => return Err(err.into()),
},
_ => None,
};
let records = CountsRequest {
stream: stream.clone(),
start_time: "1h".to_owned(),
end_time: "now".to_owned(),
num_bins: 10,
}
.get_bin_density()
.await?;
let counts = CountsResponse {
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
records,
};

// Retrieve distinct values for source identifiers
// Returns None if fields aren't present or if query fails
let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok();
let user_agents = self.get_distinct_entries(stream, "p_user_agent").await.ok();

responses.push(PrismDatasetResponse {
stream: stream.clone(),
info,
schema,
stats,
retention,
hottier,
counts,
distinct_sources: json!({
"ips": ips,
"user_agents": user_agents
}),
})
async fn process_stream(
&self,
stream: String,
key: SessionKey,
) -> Result<Option<PrismDatasetResponse>, PrismLogstreamError> {
// Skip unauthorized streams
if !self.is_authorized(&stream, &key) {
return Ok(None);
}

Ok(responses)
// Skip streams that don't exist
if !self.stream_exists(&stream).await {
return Ok(None);
}

// Process stream data
match get_prism_logstream_info(&stream).await {
Ok(info) => Ok(Some(self.build_dataset_response(stream, info).await?)),
Err(err) => Err(err),
}
}

fn is_authorized(&self, stream: &str, key: &SessionKey) -> bool {
if Users.authorize(key.clone(), Action::ListStream, Some(stream), None)
!= crate::rbac::Response::Authorized
{
warn!("Unauthorized access requested for stream: {stream}");
false
} else {
true
}
}

async fn stream_exists(&self, stream: &str) -> bool {
if PARSEABLE.check_or_load_stream(stream).await {
warn!("Stream not found: {stream}");
false
} else {
true
}
}

async fn build_dataset_response(
&self,
stream: String,
info: PrismLogstreamInfo,
) -> Result<PrismDatasetResponse, PrismLogstreamError> {
// Get hot tier info
let hottier = self.get_hot_tier_info(&stream).await?;

// Get counts
let counts = self.get_counts(&stream).await?;

// Get distinct entries concurrently
let (ips_result, user_agents_result) = futures::join!(
self.get_distinct_entries(&stream, "p_src_ip"),
self.get_distinct_entries(&stream, "p_user_agent")
);

let ips = ips_result.ok();
let user_agents = user_agents_result.ok();

Ok(PrismDatasetResponse {
stream,
info: info.info,
schema: info.schema,
stats: info.stats,
retention: info.retention,
hottier,
counts,
distinct_sources: json!({
"ips": ips,
"user_agents": user_agents
}),
})
}

async fn get_hot_tier_info(
&self,
stream: &str,
) -> Result<Option<StreamHotTier>, PrismLogstreamError> {
match HotTierManager::global() {
Some(manager) => match manager.get_hot_tier(stream).await {
Ok(stats) => Ok(Some(stats)),
Err(HotTierError::HotTierValidationError(HotTierValidationError::NotFound(_))) => {
Ok(None)
}
Err(err) => Err(err.into()),
},
None => Ok(None),
}
}

async fn get_counts(&self, stream: &str) -> Result<CountsResponse, PrismLogstreamError> {
let count_request = CountsRequest {
stream: stream.to_owned(),
start_time: "1h".to_owned(),
end_time: "now".to_owned(),
num_bins: 10,
};

let records = count_request.get_bin_density().await?;
Ok(CountsResponse {
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
records,
})
}

/// Retrieves distinct values for a specific field in a stream.
Expand Down
Loading