Skip to content

Commit ddf25e0

Browse files
author
Devdutt Shenoi
committed
refactor: associate ObjectStore::commit_schema
1 parent 8381e72 commit ddf25e0

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

src/handlers/http/query.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery};
4545
use crate::query::{TableScanVisitor, QUERY_SESSION};
4646
use crate::rbac::Users;
4747
use crate::response::QueryResponse;
48-
use crate::storage::object_storage::commit_schema_to_storage;
4948
use crate::storage::ObjectStorageError;
5049
use crate::utils::actix::extract_session_key_from_req;
5150
use crate::utils::time::{TimeParseError, TimeRange};
@@ -172,7 +171,11 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
172171
for table in tables {
173172
if let Ok(new_schema) = fetch_schema(table).await {
174173
// commit schema merges the schema internally and updates the schema in storage.
175-
commit_schema_to_storage(table, new_schema.clone()).await?;
174+
PARSEABLE
175+
.storage
176+
.get_object_store()
177+
.commit_schema(table, new_schema.clone())
178+
.await?;
176179

177180
commit_schema(table, Arc::new(new_schema))?;
178181
}

src/storage/object_storage.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
780780
for path in stream.schema_files() {
781781
let file = File::open(&path)?;
782782
let schema: Schema = serde_json::from_reader(file)?;
783-
commit_schema_to_storage(&stream_name, schema).await?;
783+
self.commit_schema(&stream_name, schema).await?;
784784
if let Err(e) = remove_file(path) {
785785
warn!("Failed to remove staged file: {e}");
786786
}
@@ -789,16 +789,16 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
789789

790790
Ok(())
791791
}
792-
}
793792

794-
pub async fn commit_schema_to_storage(
795-
stream_name: &str,
796-
schema: Schema,
797-
) -> Result<(), ObjectStorageError> {
798-
let storage = PARSEABLE.storage().get_object_store();
799-
let stream_schema = storage.get_schema(stream_name).await?;
800-
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
801-
storage.put_schema(stream_name, &new_schema).await
793+
async fn commit_schema(
794+
&self,
795+
stream_name: &str,
796+
schema: Schema,
797+
) -> Result<(), ObjectStorageError> {
798+
let stream_schema = self.get_schema(stream_name).await?;
799+
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();
800+
self.put_schema(stream_name, &new_schema).await
801+
}
802802
}
803803

804804
#[inline(always)]

0 commit comments

Comments
 (0)