Skip to content

Commit 2c43745

Browse files
author
Devdutt Shenoi
committed
refactor: associate ObjectStore::commit_schema
1 parent d9da876 commit 2c43745

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

src/handlers/http/query.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery
4545
use crate::query::{TableScanVisitor, QUERY_SESSION};
4646
use crate::rbac::Users;
4747
use crate::response::QueryResponse;
48-
use crate::storage::object_storage::commit_schema_to_storage;
4948
use crate::storage::ObjectStorageError;
5049
use crate::utils::actix::extract_session_key_from_req;
5150
use crate::utils::time::{TimeParseError, TimeRange};
@@ -173,7 +172,11 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
173172
for table in tables {
174173
if let Ok(new_schema) = fetch_schema(table).await {
175174
// commit schema merges the schema internally and updates the schema in storage.
176-
commit_schema_to_storage(table, new_schema.clone()).await?;
175+
PARSEABLE
176+
.storage
177+
.get_object_store()
178+
.commit_schema(table, new_schema.clone())
179+
.await?;
177180

178181
commit_schema(table, Arc::new(new_schema))?;
179182
}

src/storage/object_storage.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
869869
for path in stream.schema_files() {
870870
let file = File::open(&path)?;
871871
let schema: Schema = serde_json::from_reader(file)?;
872-
commit_schema_to_storage(&stream_name, schema).await?;
872+
self.commit_schema(&stream_name, schema).await?;
873873
if let Err(e) = remove_file(path) {
874874
warn!("Failed to remove staged file: {e}");
875875
}
@@ -878,16 +878,16 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
878878

879879
Ok(())
880880
}
881-
}
882881

883-
pub async fn commit_schema_to_storage(
884-
stream_name: &str,
885-
schema: Schema,
886-
) -> Result<(), ObjectStorageError> {
887-
let storage = PARSEABLE.storage().get_object_store();
888-
let stream_schema = storage.get_schema(stream_name).await?;
889-
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
890-
storage.put_schema(stream_name, &new_schema).await
882+
async fn commit_schema(
883+
&self,
884+
stream_name: &str,
885+
schema: Schema,
886+
) -> Result<(), ObjectStorageError> {
887+
let stream_schema = self.get_schema(stream_name).await?;
888+
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
889+
self.put_schema(stream_name, &new_schema).await
890+
}
891891
}
892892

893893
#[inline(always)]

0 commit comments

Comments
 (0)