diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index cb8e95344..1a6f664c2 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -35,11 +35,12 @@ use http::StatusCode; use itertools::Itertools; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use tracing::error; +use tokio::task::JoinSet; +use tracing::{error, warn}; use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; @@ -126,7 +127,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result raw_logical_plan, Err(_) => { - create_streams_for_querier().await; + create_streams_for_querier().await?; session_state .create_logical_plan(&query_request.query) .await? @@ -433,17 +434,45 @@ pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), /// Create streams for querier if they do not exist /// get list of streams from memory and storage /// create streams for memory from storage if they do not exist -pub async fn create_streams_for_querier() { - let querier_streams = PARSEABLE.streams.list(); +pub async fn create_streams_for_querier() -> Result<(), QueryError> { let store = PARSEABLE.storage.get_object_store(); - let storage_streams = store.list_streams().await.unwrap(); - for stream_name in storage_streams { - if !querier_streams.contains(&stream_name) { - let _ = PARSEABLE + let querier_streams = PARSEABLE.streams.list(); + + let querier_streams_set: HashSet<_> = querier_streams.into_iter().collect(); + + let storage_streams = store.list_streams().await?; + + let missing_streams: Vec<_> = storage_streams + .into_iter() + .filter(|stream_name| !querier_streams_set.contains(stream_name)) + .collect(); + + if missing_streams.is_empty() { + return Ok(()); + } + + let mut join_set = JoinSet::new(); + for stream_name in missing_streams { + join_set.spawn(async move { + let result = PARSEABLE .create_stream_and_schema_from_storage(&stream_name) .await; + + if let Err(e) = &result { + warn!("Failed to create stream '{}': {}", stream_name, e); + } + + (stream_name, result) + }); + } + + while let Some(result) = join_set.join_next().await { + if let Err(join_error) = result { + warn!("Task join error: {}", join_error); } } + + Ok(()) } impl FromRequest for Query { diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 4537ac3dc..439f71ee0 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -35,6 +35,7 @@ use once_cell::sync::Lazy; pub use staging::StagingError; use streams::StreamRef; pub use streams::{Stream, StreamNotFound, Streams}; +use tokio::try_join; use tracing::error; #[cfg(feature = "kafka")] @@ -270,17 +271,22 @@ impl Parseable { return Ok(false); } - let mut stream_metadata = ObjectStoreFormat::default(); - let stream_metadata_bytes = storage.create_stream_from_ingestor(stream_name).await?; - if !stream_metadata_bytes.is_empty() { - stream_metadata = serde_json::from_slice::(&stream_metadata_bytes)?; - } + let (stream_metadata_bytes, schema_bytes) = try_join!( + storage.create_stream_from_ingestor(stream_name), + storage.create_schema_from_ingestor(stream_name) + )?; - let mut schema = Arc::new(Schema::empty()); - let schema_bytes = storage.create_schema_from_ingestor(stream_name).await?; - if !schema_bytes.is_empty() { - schema = serde_json::from_slice::>(&schema_bytes)?; - } + let stream_metadata = if stream_metadata_bytes.is_empty() { + ObjectStoreFormat::default() + } else { + serde_json::from_slice::(&stream_metadata_bytes)? + }; + + let schema = if schema_bytes.is_empty() { + Arc::new(Schema::empty()) + } else { + serde_json::from_slice::>(&schema_bytes)? + }; let static_schema: HashMap> = schema .fields