Skip to content

Commit 1d4e858

Browse files
fix: create stream before merge schema (#1381)
issue: merge schema and commit to memory happening before stream creation from storage fix: first create stream from storage if not present then merge schema and commit to memory
1 parent a883589 commit 1d4e858

File tree

5 files changed

+31
-98
lines changed

5 files changed

+31
-98
lines changed

src/alerts/alerts_utils.rs

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,11 @@ use datafusion::{
2929
logical_expr::{BinaryExpr, Literal, Operator},
3030
prelude::{col, lit, DataFrame, Expr},
3131
};
32-
use tokio::task::JoinSet;
33-
use tracing::{trace, warn};
32+
use tracing::trace;
3433

3534
use crate::{
3635
alerts::LogicalOperator,
37-
handlers::http::query::update_schema_when_distributed,
36+
handlers::http::query::{create_streams_for_distributed, update_schema_when_distributed},
3837
parseable::PARSEABLE,
3938
query::{resolve_stream_names, QUERY_SESSION},
4039
utils::time::TimeRange,
@@ -79,34 +78,12 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert
7978
let time_range = TimeRange::parse_human_time(start_time, end_time)
8079
.map_err(|err| AlertError::CustomError(err.to_string()))?;
8180

82-
let streams = resolve_stream_names(&select_query)?;
83-
let raw_logical_plan = match session_state.create_logical_plan(&select_query).await {
84-
Ok(plan) => plan,
85-
Err(_) => {
86-
let mut join_set = JoinSet::new();
87-
for stream_name in streams {
88-
let stream_name = stream_name.clone();
89-
join_set.spawn(async move {
90-
let result = PARSEABLE
91-
.create_stream_and_schema_from_storage(&stream_name)
92-
.await;
93-
94-
if let Err(e) = &result {
95-
warn!("Failed to create stream '{}': {}", stream_name, e);
96-
}
97-
98-
(stream_name, result)
99-
});
100-
}
101-
102-
while let Some(result) = join_set.join_next().await {
103-
if let Err(join_error) = result {
104-
warn!("Task join error: {}", join_error);
105-
}
106-
}
107-
session_state.create_logical_plan(&select_query).await?
108-
}
109-
};
81+
let tables = resolve_stream_names(&select_query)?;
82+
//check or load streams in memory
83+
create_streams_for_distributed(tables.clone())
84+
.await
85+
.map_err(|err| AlertError::CustomError(format!("Failed to create streams: {err}")))?;
86+
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;
11087
Ok(crate::query::Query {
11188
raw_logical_plan,
11289
time_range,

src/handlers/airplane.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl FlightService for AirServiceImpl {
148148
.to_owned();
149149

150150
// map payload to query
151-
let query = into_query(&ticket, &session_state, time_range, &streams)
151+
let query = into_query(&ticket, &session_state, time_range)
152152
.await
153153
.map_err(|_| Status::internal("Failed to parse query"))?;
154154

src/handlers/http/demo_data.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub async fn get_demo_data(req: HttpRequest) -> Result<HttpResponse, PostError>
6363
// Forward the request to ingestor asynchronously
6464
match get_demo_data_from_ingestor(&action).await {
6565
Ok(()) => Ok(HttpResponse::Accepted().finish()),
66-
Err(e) => Err(PostError::Invalid(anyhow::anyhow!(e))),
66+
Err(e) => Err(e),
6767
}
6868
}
6969
_ => Err(PostError::Invalid(anyhow::anyhow!(

src/handlers/http/query.rs

Lines changed: 14 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use http::StatusCode;
3636
use itertools::Itertools;
3737
use serde::{Deserialize, Serialize};
3838
use serde_json::{json, Value};
39-
use std::collections::{HashMap, HashSet};
39+
use std::collections::HashMap;
4040
use std::pin::Pin;
4141
use std::sync::Arc;
4242
use std::time::Instant;
@@ -81,22 +81,22 @@ pub async fn get_records_and_fields(
8181
query_request: &Query,
8282
req: &HttpRequest,
8383
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
84-
let tables = resolve_stream_names(&query_request.query)?;
8584
let session_state = QUERY_SESSION.state();
86-
8785
let time_range =
8886
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
87+
let tables = resolve_stream_names(&query_request.query)?;
88+
//check or load streams in memory
89+
create_streams_for_distributed(tables.clone()).await?;
8990

90-
let query: LogicalQuery =
91-
into_query(query_request, &session_state, time_range, &tables).await?;
91+
let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?;
9292
let creds = extract_session_key_from_req(req)?;
9393
let permissions = Users.get_permissions(&creds);
9494

9595
let table_name = tables
9696
.first()
9797
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
9898
user_auth_for_datasets(&permissions, &tables).await?;
99-
update_schema_when_distributed(&tables).await?;
99+
100100
let (records, fields) = execute(query, table_name, false).await?;
101101

102102
let records = match records {
@@ -114,9 +114,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
114114
let time_range =
115115
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
116116
let tables = resolve_stream_names(&query_request.query)?;
117-
update_schema_when_distributed(&tables).await?;
118-
let query: LogicalQuery =
119-
into_query(&query_request, &session_state, time_range, &tables).await?;
117+
//check or load streams in memory
118+
create_streams_for_distributed(tables.clone()).await?;
119+
120+
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
120121
let creds = extract_session_key_from_req(&req)?;
121122
let permissions = Users.get_permissions(&creds);
122123

@@ -406,35 +407,12 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
406407
/// Create streams for querier if they do not exist
407408
/// get list of streams from memory and storage
408409
/// create streams for memory from storage if they do not exist
409-
pub async fn create_streams_for_querier() -> Result<(), QueryError> {
410-
let store = PARSEABLE.storage.get_object_store();
411-
let querier_streams = PARSEABLE.streams.list();
412-
413-
let querier_streams_set: HashSet<_> = querier_streams.into_iter().collect();
414-
// fetch querier streams which have field list blank
415-
// now missing streams should be list of streams which are in storage but not in querier
416-
// and also have no fields in the schema
417-
// this is to ensure that we do not create streams for querier which already exist in querier
418-
419-
let missing_streams: Vec<_> = store
420-
.list_streams()
421-
.await?
422-
.into_iter()
423-
.filter(|stream_name| {
424-
!querier_streams_set.contains(stream_name)
425-
|| PARSEABLE
426-
.get_stream(stream_name)
427-
.map(|s| s.get_schema().fields().is_empty())
428-
.unwrap_or(false)
429-
})
430-
.collect();
431-
432-
if missing_streams.is_empty() {
410+
pub async fn create_streams_for_distributed(streams: Vec<String>) -> Result<(), QueryError> {
411+
if PARSEABLE.options.mode != Mode::Query && PARSEABLE.options.mode != Mode::Prism {
433412
return Ok(());
434413
}
435-
436414
let mut join_set = JoinSet::new();
437-
for stream_name in missing_streams {
415+
for stream_name in streams {
438416
join_set.spawn(async move {
439417
let result = PARSEABLE
440418
.create_stream_and_schema_from_storage(&stream_name)
@@ -492,7 +470,6 @@ pub async fn into_query(
492470
query: &Query,
493471
session_state: &SessionState,
494472
time_range: TimeRange,
495-
tables: &Vec<String>,
496473
) -> Result<LogicalQuery, QueryError> {
497474
if query.query.is_empty() {
498475
return Err(QueryError::EmptyQuery);
@@ -505,33 +482,7 @@ pub async fn into_query(
505482
if query.end_time.is_empty() {
506483
return Err(QueryError::EmptyEndTime);
507484
}
508-
let raw_logical_plan = match session_state.create_logical_plan(&query.query).await {
509-
Ok(plan) => plan,
510-
Err(_) => {
511-
let mut join_set = JoinSet::new();
512-
for stream_name in tables {
513-
let stream_name = stream_name.clone();
514-
join_set.spawn(async move {
515-
let result = PARSEABLE
516-
.create_stream_and_schema_from_storage(&stream_name)
517-
.await;
518-
519-
if let Err(e) = &result {
520-
warn!("Failed to create stream '{}': {}", stream_name, e);
521-
}
522-
523-
(stream_name, result)
524-
});
525-
}
526-
527-
while let Some(result) = join_set.join_next().await {
528-
if let Err(join_error) = result {
529-
warn!("Task join error: {}", join_error);
530-
}
531-
}
532-
session_state.create_logical_plan(&query.query).await?
533-
}
534-
};
485+
let raw_logical_plan = session_state.create_logical_plan(&query.query).await?;
535486

536487
Ok(crate::query::Query {
537488
raw_logical_plan,

src/parseable/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ use tracing::error;
4242
use crate::connectors::kafka::config::KafkaConfig;
4343
use crate::{
4444
cli::{Cli, Options, StorageOptions},
45-
event::format::{LogSource, LogSourceEntry},
45+
event::{
46+
commit_schema,
47+
format::{LogSource, LogSourceEntry},
48+
},
4649
handlers::{
4750
http::{
4851
cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME},
@@ -278,7 +281,6 @@ impl Parseable {
278281
if !streams.contains(stream_name) {
279282
return Ok(false);
280283
}
281-
282284
let (stream_metadata_bytes, schema_bytes) = try_join!(
283285
storage.create_stream_from_ingestor(stream_name),
284286
storage.create_schema_from_storage(stream_name)
@@ -335,6 +337,9 @@ impl Parseable {
335337
ingestor_id,
336338
);
337339

340+
//commit schema in memory
341+
commit_schema(stream_name, schema).map_err(|e| StreamError::Anyhow(e.into()))?;
342+
338343
Ok(true)
339344
}
340345

0 commit comments

Comments
 (0)