Skip to content

Commit 781fdd2

Browse files
fix stream/schema creation
1 parent 0399ba1 commit 781fdd2

File tree

5 files changed

+29
-98
lines changed

5 files changed

+29
-98
lines changed

src/alerts/alerts_utils.rs

Lines changed: 8 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,11 @@ use datafusion::{
2929
logical_expr::{BinaryExpr, Literal, Operator},
3030
prelude::{col, lit, DataFrame, Expr},
3131
};
32-
use tokio::task::JoinSet;
33-
use tracing::{trace, warn};
32+
use tracing::trace;
3433

3534
use crate::{
3635
alerts::LogicalOperator,
37-
handlers::http::query::update_schema_when_distributed,
36+
handlers::http::query::{create_streams_for_distributed, update_schema_when_distributed},
3837
parseable::PARSEABLE,
3938
query::{resolve_stream_names, QUERY_SESSION},
4039
utils::time::TimeRange,
@@ -79,34 +78,12 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert
7978
let time_range = TimeRange::parse_human_time(start_time, end_time)
8079
.map_err(|err| AlertError::CustomError(err.to_string()))?;
8180

82-
let streams = resolve_stream_names(&select_query)?;
83-
let raw_logical_plan = match session_state.create_logical_plan(&select_query).await {
84-
Ok(plan) => plan,
85-
Err(_) => {
86-
let mut join_set = JoinSet::new();
87-
for stream_name in streams {
88-
let stream_name = stream_name.clone();
89-
join_set.spawn(async move {
90-
let result = PARSEABLE
91-
.create_stream_and_schema_from_storage(&stream_name)
92-
.await;
93-
94-
if let Err(e) = &result {
95-
warn!("Failed to create stream '{}': {}", stream_name, e);
96-
}
97-
98-
(stream_name, result)
99-
});
100-
}
101-
102-
while let Some(result) = join_set.join_next().await {
103-
if let Err(join_error) = result {
104-
warn!("Task join error: {}", join_error);
105-
}
106-
}
107-
session_state.create_logical_plan(&select_query).await?
108-
}
109-
};
81+
let tables = resolve_stream_names(&select_query)?;
82+
//check or load streams in memory
83+
create_streams_for_distributed(tables.clone())
84+
.await
85+
.map_err(|err| AlertError::CustomError(format!("Failed to create streams: {err}")))?;
86+
let raw_logical_plan = session_state.create_logical_plan(&select_query).await?;
11087
Ok(crate::query::Query {
11188
raw_logical_plan,
11289
time_range,

src/handlers/airplane.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl FlightService for AirServiceImpl {
148148
.to_owned();
149149

150150
// map payload to query
151-
let query = into_query(&ticket, &session_state, time_range, &streams)
151+
let query = into_query(&ticket, &session_state, time_range)
152152
.await
153153
.map_err(|_| Status::internal("Failed to parse query"))?;
154154

src/handlers/http/demo_data.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub async fn get_demo_data(req: HttpRequest) -> Result<HttpResponse, PostError>
6363
// Forward the request to ingestor asynchronously
6464
match get_demo_data_from_ingestor(&action).await {
6565
Ok(()) => Ok(HttpResponse::Accepted().finish()),
66-
Err(e) => Err(PostError::Invalid(anyhow::anyhow!(e))),
66+
Err(e) => Err(e),
6767
}
6868
}
6969
_ => Err(PostError::Invalid(anyhow::anyhow!(

src/handlers/http/query.rs

Lines changed: 12 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use http::StatusCode;
3636
use itertools::Itertools;
3737
use serde::{Deserialize, Serialize};
3838
use serde_json::{json, Value};
39-
use std::collections::{HashMap, HashSet};
39+
use std::collections::HashMap;
4040
use std::pin::Pin;
4141
use std::sync::Arc;
4242
use std::time::Instant;
@@ -81,15 +81,14 @@ pub async fn get_records_and_fields(
8181
query_request: &Query,
8282
req: &HttpRequest,
8383
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
84-
let tables = resolve_stream_names(&query_request.query)?;
8584
let session_state = QUERY_SESSION.state();
86-
8785
let time_range =
8886
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
87+
let tables = resolve_stream_names(&query_request.query)?;
88+
//check or load streams in memory
89+
create_streams_for_distributed(tables.clone()).await?;
8990

90-
let query: LogicalQuery =
91-
into_query(query_request, &session_state, time_range, &tables).await?;
92-
update_schema_when_distributed(&tables).await?;
91+
let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?;
9392
let creds = extract_session_key_from_req(req)?;
9493
let permissions = Users.get_permissions(&creds);
9594

@@ -115,10 +114,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
115114
let time_range =
116115
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
117116
let tables = resolve_stream_names(&query_request.query)?;
117+
//check or load streams in memory
118+
create_streams_for_distributed(tables.clone()).await?;
118119

119-
let query: LogicalQuery =
120-
into_query(&query_request, &session_state, time_range, &tables).await?;
121-
update_schema_when_distributed(&tables).await?;
120+
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
122121
let creds = extract_session_key_from_req(&req)?;
123122
let permissions = Users.get_permissions(&creds);
124123

@@ -408,35 +407,12 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
408407
/// Create streams for querier if they do not exist
409408
/// get list of streams from memory and storage
410409
/// create streams for memory from storage if they do not exist
411-
pub async fn create_streams_for_querier() -> Result<(), QueryError> {
412-
let store = PARSEABLE.storage.get_object_store();
413-
let querier_streams = PARSEABLE.streams.list();
414-
415-
let querier_streams_set: HashSet<_> = querier_streams.into_iter().collect();
416-
// fetch querier streams which have field list blank
417-
// now missing streams should be list of streams which are in storage but not in querier
418-
// and also have no fields in the schema
419-
// this is to ensure that we do not create streams for querier which already exist in querier
420-
421-
let missing_streams: Vec<_> = store
422-
.list_streams()
423-
.await?
424-
.into_iter()
425-
.filter(|stream_name| {
426-
!querier_streams_set.contains(stream_name)
427-
|| PARSEABLE
428-
.get_stream(stream_name)
429-
.map(|s| s.get_schema().fields().is_empty())
430-
.unwrap_or(false)
431-
})
432-
.collect();
433-
434-
if missing_streams.is_empty() {
410+
pub async fn create_streams_for_distributed(streams: Vec<String>) -> Result<(), QueryError> {
411+
if PARSEABLE.options.mode != Mode::Query && PARSEABLE.options.mode != Mode::Prism {
435412
return Ok(());
436413
}
437-
438414
let mut join_set = JoinSet::new();
439-
for stream_name in missing_streams {
415+
for stream_name in streams {
440416
join_set.spawn(async move {
441417
let result = PARSEABLE
442418
.create_stream_and_schema_from_storage(&stream_name)
@@ -494,7 +470,6 @@ pub async fn into_query(
494470
query: &Query,
495471
session_state: &SessionState,
496472
time_range: TimeRange,
497-
tables: &Vec<String>,
498473
) -> Result<LogicalQuery, QueryError> {
499474
if query.query.is_empty() {
500475
return Err(QueryError::EmptyQuery);
@@ -507,33 +482,7 @@ pub async fn into_query(
507482
if query.end_time.is_empty() {
508483
return Err(QueryError::EmptyEndTime);
509484
}
510-
let raw_logical_plan = match session_state.create_logical_plan(&query.query).await {
511-
Ok(plan) => plan,
512-
Err(_) => {
513-
let mut join_set = JoinSet::new();
514-
for stream_name in tables {
515-
let stream_name = stream_name.clone();
516-
join_set.spawn(async move {
517-
let result = PARSEABLE
518-
.create_stream_and_schema_from_storage(&stream_name)
519-
.await;
520-
521-
if let Err(e) = &result {
522-
warn!("Failed to create stream '{}': {}", stream_name, e);
523-
}
524-
525-
(stream_name, result)
526-
});
527-
}
528-
529-
while let Some(result) = join_set.join_next().await {
530-
if let Err(join_error) = result {
531-
warn!("Task join error: {}", join_error);
532-
}
533-
}
534-
session_state.create_logical_plan(&query.query).await?
535-
}
536-
};
485+
let raw_logical_plan = session_state.create_logical_plan(&query.query).await?;
537486

538487
Ok(crate::query::Query {
539488
raw_logical_plan,

src/parseable/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ use tracing::error;
4242
use crate::connectors::kafka::config::KafkaConfig;
4343
use crate::{
4444
cli::{Cli, Options, StorageOptions},
45-
event::format::{LogSource, LogSourceEntry},
45+
event::{
46+
commit_schema,
47+
format::{LogSource, LogSourceEntry},
48+
},
4649
handlers::{
4750
http::{
4851
cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME},
@@ -278,7 +281,6 @@ impl Parseable {
278281
if !streams.contains(stream_name) {
279282
return Ok(false);
280283
}
281-
282284
let (stream_metadata_bytes, schema_bytes) = try_join!(
283285
storage.create_stream_from_ingestor(stream_name),
284286
storage.create_schema_from_storage(stream_name)
@@ -335,6 +337,9 @@ impl Parseable {
335337
ingestor_id,
336338
);
337339

340+
//commit schema in memory
341+
commit_schema(stream_name, schema).map_err(|e| StreamError::Anyhow(e.into()))?;
342+
338343
Ok(true)
339344
}
340345

0 commit comments

Comments
 (0)