Skip to content

Commit 3891e7e

Browse files
update schema for distributed
1 parent aaffa5d commit 3891e7e

File tree

3 files changed

+10
-6
lines changed

3 files changed

+10
-6
lines changed

src/event/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), Stagi
123123

124124
let map = &mut stream_metadata
125125
.get_mut(stream_name)
126-
.expect("map has entry for this stream name")
126+
.ok_or_else(|| StagingError::NotFound(stream_name.to_string()))?
127127
.metadata
128128
.write()
129129
.expect(LOCK_EXPECT)

src/handlers/http/query.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use crate::event::error::EventError;
2020
use crate::handlers::http::fetch_schema;
21+
use crate::option::Mode;
2122
use crate::utils::arrow::record_batches_to_json;
2223
use actix_web::http::header::ContentType;
2324
use actix_web::web::{self, Json};
@@ -113,6 +114,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
113114
let time_range =
114115
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;
115116
let tables = resolve_stream_names(&query_request.query)?;
117+
update_schema_when_distributed(&tables).await?;
116118
let query: LogicalQuery =
117119
into_query(&query_request, &session_state, time_range, &tables).await?;
118120
let creds = extract_session_key_from_req(&req)?;
@@ -122,7 +124,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
122124
.first()
123125
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
124126
user_auth_for_datasets(&permissions, &tables).await?;
125-
update_schema_when_distributed(&tables).await?;
126127
let time = Instant::now();
127128

128129
// if the query is `select count(*) from <dataset>`
@@ -392,12 +393,13 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
392393
// if the mode is query or prism, we need to update the schema in memory
393394
// no need to commit schema to storage
394395
// as the schema is read from memory everytime
395-
for table in tables {
396-
if let Ok(new_schema) = fetch_schema(table).await {
397-
commit_schema(table, Arc::new(new_schema))?;
396+
if PARSEABLE.options.mode == Mode::Query || PARSEABLE.options.mode == Mode::Prism {
397+
for table in tables {
398+
if let Ok(new_schema) = fetch_schema(table).await {
399+
commit_schema(table, Arc::new(new_schema))?;
400+
}
398401
}
399402
}
400-
401403
Ok(())
402404
}
403405

src/parseable/staging/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub enum StagingError {
3030
ObjectStorage(#[from] std::io::Error),
3131
#[error("Could not generate parquet file")]
3232
Create,
33+
#[error("Could not find stream {0}")]
34+
NotFound(String),
3335
// #[error("Metadata Error: {0}")]
3436
// Metadata(#[from] MetadataError),
3537
}

0 commit comments

Comments
 (0)