diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index a3b9d5a45..1234b4d7c 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -29,12 +29,11 @@ use datafusion::{ logical_expr::{BinaryExpr, Literal, Operator}, prelude::{col, lit, DataFrame, Expr}, }; -use tokio::task::JoinSet; -use tracing::{trace, warn}; +use tracing::trace; use crate::{ alerts::LogicalOperator, - handlers::http::query::update_schema_when_distributed, + handlers::http::query::{create_streams_for_distributed, update_schema_when_distributed}, parseable::PARSEABLE, query::{resolve_stream_names, QUERY_SESSION}, utils::time::TimeRange, @@ -79,34 +78,12 @@ async fn prepare_query(alert: &AlertConfig) -> Result plan, - Err(_) => { - let mut join_set = JoinSet::new(); - for stream_name in streams { - let stream_name = stream_name.clone(); - join_set.spawn(async move { - let result = PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await; - - if let Err(e) = &result { - warn!("Failed to create stream '{}': {}", stream_name, e); - } - - (stream_name, result) - }); - } - - while let Some(result) = join_set.join_next().await { - if let Err(join_error) = result { - warn!("Task join error: {}", join_error); - } - } - session_state.create_logical_plan(&select_query).await? - } - }; + let tables = resolve_stream_names(&select_query)?; + //check or load streams in memory + create_streams_for_distributed(tables.clone()) + .await + .map_err(|err| AlertError::CustomError(format!("Failed to create streams: {err}")))?; + let raw_logical_plan = session_state.create_logical_plan(&select_query).await?; Ok(crate::query::Query { raw_logical_plan, time_range, diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 4ed2f3950..0bb1f3457 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -148,7 +148,7 @@ impl FlightService for AirServiceImpl { .to_owned(); // map payload to query - let query = into_query(&ticket, &session_state, time_range, &streams) + let query = into_query(&ticket, &session_state, time_range) .await .map_err(|_| Status::internal("Failed to parse query"))?; diff --git a/src/handlers/http/demo_data.rs b/src/handlers/http/demo_data.rs index db74d5d70..989f475c6 100644 --- a/src/handlers/http/demo_data.rs +++ b/src/handlers/http/demo_data.rs @@ -63,7 +63,7 @@ pub async fn get_demo_data(req: HttpRequest) -> Result // Forward the request to ingestor asynchronously match get_demo_data_from_ingestor(&action).await { Ok(()) => Ok(HttpResponse::Accepted().finish()), - Err(e) => Err(PostError::Invalid(anyhow::anyhow!(e))), + Err(e) => Err(e), } } _ => Err(PostError::Invalid(anyhow::anyhow!( diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 48ac411ec..8ff2312da 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -36,7 +36,7 @@ use http::StatusCode; use itertools::Itertools; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; @@ -81,14 +81,14 @@ pub async fn get_records_and_fields( query_request: &Query, req: &HttpRequest, ) -> Result<(Option>, Option>), QueryError> { - let tables = resolve_stream_names(&query_request.query)?; let session_state = QUERY_SESSION.state(); - let time_range = TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?; + let tables = resolve_stream_names(&query_request.query)?; + //check or load streams in memory + create_streams_for_distributed(tables.clone()).await?; - let query: LogicalQuery = - into_query(query_request, &session_state, time_range, &tables).await?; + let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?; let creds = extract_session_key_from_req(req)?; let permissions = Users.get_permissions(&creds); @@ -96,7 +96,7 @@ pub async fn get_records_and_fields( .first() .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; user_auth_for_datasets(&permissions, &tables).await?; - update_schema_when_distributed(&tables).await?; + let (records, fields) = execute(query, table_name, false).await?; let records = match records { @@ -114,9 +114,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result) -> Result<(), /// Create streams for querier if they do not exist /// get list of streams from memory and storage /// create streams for memory from storage if they do not exist -pub async fn create_streams_for_querier() -> Result<(), QueryError> { - let store = PARSEABLE.storage.get_object_store(); - let querier_streams = PARSEABLE.streams.list(); - - let querier_streams_set: HashSet<_> = querier_streams.into_iter().collect(); - // fetch querier streams which have field list blank - // now missing streams should be list of streams which are in storage but not in querier - // and also have no fields in the schema - // this is to ensure that we do not create streams for querier which already exist in querier - - let missing_streams: Vec<_> = store - .list_streams() - .await? - .into_iter() - .filter(|stream_name| { - !querier_streams_set.contains(stream_name) - || PARSEABLE - .get_stream(stream_name) - .map(|s| s.get_schema().fields().is_empty()) - .unwrap_or(false) - }) - .collect(); - - if missing_streams.is_empty() { +pub async fn create_streams_for_distributed(streams: Vec) -> Result<(), QueryError> { + if PARSEABLE.options.mode != Mode::Query && PARSEABLE.options.mode != Mode::Prism { return Ok(()); } - let mut join_set = JoinSet::new(); - for stream_name in missing_streams { + for stream_name in streams { join_set.spawn(async move { let result = PARSEABLE .create_stream_and_schema_from_storage(&stream_name) @@ -492,7 +470,6 @@ pub async fn into_query( query: &Query, session_state: &SessionState, time_range: TimeRange, - tables: &Vec, ) -> Result { if query.query.is_empty() { return Err(QueryError::EmptyQuery); @@ -505,33 +482,7 @@ pub async fn into_query( if query.end_time.is_empty() { return Err(QueryError::EmptyEndTime); } - let raw_logical_plan = match session_state.create_logical_plan(&query.query).await { - Ok(plan) => plan, - Err(_) => { - let mut join_set = JoinSet::new(); - for stream_name in tables { - let stream_name = stream_name.clone(); - join_set.spawn(async move { - let result = PARSEABLE - .create_stream_and_schema_from_storage(&stream_name) - .await; - - if let Err(e) = &result { - warn!("Failed to create stream '{}': {}", stream_name, e); - } - - (stream_name, result) - }); - } - - while let Some(result) = join_set.join_next().await { - if let Err(join_error) = result { - warn!("Task join error: {}", join_error); - } - } - session_state.create_logical_plan(&query.query).await? - } - }; + let raw_logical_plan = session_state.create_logical_plan(&query.query).await?; Ok(crate::query::Query { raw_logical_plan, diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index e1f09ee23..a69bc9881 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -42,7 +42,10 @@ use tracing::error; use crate::connectors::kafka::config::KafkaConfig; use crate::{ cli::{Cli, Options, StorageOptions}, - event::format::{LogSource, LogSourceEntry}, + event::{ + commit_schema, + format::{LogSource, LogSourceEntry}, + }, handlers::{ http::{ cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}, @@ -278,7 +281,6 @@ impl Parseable { if !streams.contains(stream_name) { return Ok(false); } - let (stream_metadata_bytes, schema_bytes) = try_join!( storage.create_stream_from_ingestor(stream_name), storage.create_schema_from_storage(stream_name) @@ -335,6 +337,9 @@ impl Parseable { ingestor_id, ); + //commit schema in memory + commit_schema(stream_name, schema).map_err(|e| StreamError::Anyhow(e.into()))?; + Ok(true) }