Skip to content

Commit 24ac1bb

Browse files
author
Devdutt Shenoi
committed
refactor: Parseable::update_schema_when_distributed
1 parent 28d2d9d commit 24ac1bb

File tree

5 files changed

+34
-30
lines changed

5 files changed

+34
-30
lines changed

src/handlers/airplane.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
3434
use tonic_web::GrpcWebLayer;
3535

3636
use crate::handlers::http::cluster::get_ingestor_info;
37-
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
37+
use crate::handlers::http::query::into_query;
3838
use crate::handlers::livetail::cross_origin_config;
3939
use crate::metrics::QUERY_EXECUTE_TIME;
4040
use crate::parseable::PARSEABLE;
@@ -156,7 +156,8 @@ impl FlightService for AirServiceImpl {
156156
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
157157
.to_owned();
158158

159-
update_schema_when_distributed(&streams)
159+
PARSEABLE
160+
.update_schema_when_distributed(&streams)
160161
.await
161162
.map_err(|err| Status::internal(err.to_string()))?;
162163

src/handlers/http/logstream.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
use self::error::StreamError;
2020
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
21-
use super::query::update_schema_when_distributed;
2221
use crate::event::format::override_data_type;
2322
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
2423
use crate::metadata::SchemaVersion;
@@ -129,7 +128,10 @@ pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, Str
129128
}
130129

131130
let stream = PARSEABLE.get_stream(&stream_name)?;
132-
match update_schema_when_distributed(&vec![stream_name.clone()]).await {
131+
match PARSEABLE
132+
.update_schema_when_distributed(&vec![stream_name.clone()])
133+
.await
134+
{
133135
Ok(_) => {
134136
let schema = stream.get_schema();
135137
Ok((web::Json(schema), StatusCode::OK))

src/handlers/http/query.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,11 @@ use serde::{Deserialize, Serialize};
2929
use serde_json::{json, Value};
3030
use std::collections::HashMap;
3131
use std::pin::Pin;
32-
use std::sync::Arc;
3332
use std::time::Instant;
3433
use tracing::error;
3534

36-
use crate::event::commit_schema;
3735
use crate::event::error::EventError;
3836
use crate::metrics::QUERY_EXECUTE_TIME;
39-
use crate::option::Mode;
4037
use crate::parseable::PARSEABLE;
4138
use crate::query::error::ExecuteError;
4239
use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery};
@@ -86,7 +83,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
8683
let _ = raw_logical_plan.visit(&mut visitor);
8784

8885
let tables = visitor.into_inner();
89-
update_schema_when_distributed(&tables).await?;
86+
PARSEABLE.update_schema_when_distributed(&tables).await?;
9087
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
9188

9289
let creds = extract_session_key_from_req(&req)?;
@@ -164,25 +161,6 @@ pub async fn get_counts(
164161
}))
165162
}
166163

167-
pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), EventError> {
168-
if PARSEABLE.options.mode == Mode::Query {
169-
for table in tables {
170-
if let Ok(new_schema) = PARSEABLE.fetch_schema(table).await {
171-
// commit schema merges the schema internally and updates the schema in storage.
172-
PARSEABLE
173-
.storage
174-
.get_object_store()
175-
.commit_schema(table, new_schema.clone())
176-
.await?;
177-
178-
commit_schema(table, Arc::new(new_schema))?;
179-
}
180-
}
181-
}
182-
183-
Ok(())
184-
}
185-
186164
/// Create streams for querier if they do not exist
187165
/// get list of streams from memory and storage
188166
/// create streams for memory from storage if they do not exist

src/parseable/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use tracing::error;
3737
use crate::connectors::kafka::config::KafkaConfig;
3838
use crate::{
3939
cli::{Cli, Options, StorageOptions},
40-
event::format::LogSource,
40+
event::{commit_schema, error::EventError, format::LogSource},
4141
handlers::{
4242
http::{
4343
cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME},
@@ -794,6 +794,27 @@ impl Parseable {
794794
let new_schema = Schema::try_merge(res)?;
795795
Ok(new_schema)
796796
}
797+
798+
pub async fn update_schema_when_distributed(
799+
&self,
800+
tables: &Vec<String>,
801+
) -> Result<(), EventError> {
802+
if self.options.mode == Mode::Query {
803+
for table in tables {
804+
if let Ok(new_schema) = self.fetch_schema(table).await {
805+
// commit schema merges the schema internally and updates the schema in storage.
806+
self.storage
807+
.get_object_store()
808+
.commit_schema(table, new_schema.clone())
809+
.await?;
810+
811+
commit_schema(table, Arc::new(new_schema))?;
812+
}
813+
}
814+
}
815+
816+
Ok(())
817+
}
797818
}
798819

799820
pub fn validate_static_schema(

src/prism/logstream/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use crate::{
2828
handlers::http::{
2929
cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats},
3030
logstream::error::StreamError,
31-
query::update_schema_when_distributed,
3231
},
3332
parseable::{StreamNotFound, PARSEABLE},
3433
stats,
@@ -77,7 +76,10 @@ async fn get_stream_schema_helper(stream_name: &str) -> Result<Arc<Schema>, Stre
7776
}
7877

7978
let stream = PARSEABLE.get_stream(stream_name)?;
80-
match update_schema_when_distributed(&vec![stream_name.to_owned()]).await {
79+
match PARSEABLE
80+
.update_schema_when_distributed(&vec![stream_name.to_owned()])
81+
.await
82+
{
8183
Ok(_) => {
8284
let schema = stream.get_schema();
8385
Ok(schema)

0 commit comments

Comments
 (0)