Skip to content

Commit 28d2d9d

Browse files
author
Devdutt Shenoi
committed
refactor: Parseable::fetch_schema
1 parent ddf25e0 commit 28d2d9d

File tree

3 files changed

+33
-36
lines changed

3 files changed

+33
-36
lines changed

src/handlers/http/mod.rs

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818

1919
use actix_cors::Cors;
2020
use actix_web::Responder;
21-
use arrow_schema::Schema;
2221
use http::StatusCode;
23-
use itertools::Itertools;
2422
use serde_json::Value;
2523

26-
use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT};
24+
use crate::{parseable::PARSEABLE, HTTP_CLIENT};
2725

2826
use self::{cluster::get_ingestor_info, query::Query};
2927

@@ -75,34 +73,6 @@ pub fn base_path_without_preceding_slash() -> String {
7573
format!("{API_BASE_PATH}/{API_VERSION}")
7674
}
7775

78-
/// Fetches the schema for the specified stream.
79-
///
80-
/// # Arguments
81-
///
82-
/// * `stream_name` - The name of the stream to fetch the schema for.
83-
///
84-
/// # Returns
85-
///
86-
/// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream.
87-
pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Schema> {
88-
let path_prefix =
89-
relative_path::RelativePathBuf::from(format!("{}/{}", stream_name, STREAM_ROOT_DIRECTORY));
90-
let store = PARSEABLE.storage.get_object_store();
91-
let res: Vec<Schema> = store
92-
.get_objects(
93-
Some(&path_prefix),
94-
Box::new(|file_name: String| file_name.contains(".schema")),
95-
)
96-
.await?
97-
.iter()
98-
// we should be able to unwrap as we know the data is valid schema
99-
.map(|byte_obj| serde_json::from_slice(byte_obj).expect("data is valid json"))
100-
.collect_vec();
101-
102-
let new_schema = Schema::try_merge(res)?;
103-
Ok(new_schema)
104-
}
105-
10676
/// unused for now, might need it later
10777
#[allow(unused)]
10878
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {

src/handlers/http/query.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,8 @@ use std::sync::Arc;
3333
use std::time::Instant;
3434
use tracing::error;
3535

36-
use crate::event::error::EventError;
37-
use crate::handlers::http::fetch_schema;
38-
3936
use crate::event::commit_schema;
37+
use crate::event::error::EventError;
4038
use crate::metrics::QUERY_EXECUTE_TIME;
4139
use crate::option::Mode;
4240
use crate::parseable::PARSEABLE;
@@ -169,7 +167,7 @@ pub async fn get_counts(
169167
pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), EventError> {
170168
if PARSEABLE.options.mode == Mode::Query {
171169
for table in tables {
172-
if let Ok(new_schema) = fetch_schema(table).await {
170+
if let Ok(new_schema) = PARSEABLE.fetch_schema(table).await {
173171
// commit schema merges the schema internally and updates the schema in storage.
174172
PARSEABLE
175173
.storage

src/parseable/mod.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ use bytes::Bytes;
2525
use chrono::Utc;
2626
use clap::{error::ErrorKind, Parser};
2727
use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode};
28+
use itertools::Itertools;
2829
use once_cell::sync::Lazy;
30+
use relative_path::RelativePathBuf;
2931
pub use staging::StagingError;
3032
use streams::StreamRef;
3133
pub use streams::{StreamNotFound, Streams};
@@ -50,7 +52,7 @@ use crate::{
5052
static_schema::{convert_static_schema_to_arrow_schema, StaticSchema},
5153
storage::{
5254
object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider,
53-
ObjectStoreFormat, Owner, Permisssion, StreamType,
55+
ObjectStoreFormat, Owner, Permisssion, StreamType, STREAM_ROOT_DIRECTORY,
5456
},
5557
validator,
5658
};
@@ -765,6 +767,33 @@ impl Parseable {
765767

766768
Some(first_event_at.to_string())
767769
}
770+
771+
/// Fetches the schema for the specified stream.
772+
///
773+
/// # Arguments
774+
///
775+
/// * `stream_name` - The name of the stream to fetch the schema for.
776+
///
777+
/// # Returns
778+
///
779+
/// An `anyhow::Result` containing the `arrow_schema::Schema` for the specified stream.
780+
pub async fn fetch_schema(&self, stream_name: &str) -> anyhow::Result<Schema> {
781+
let path_prefix = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]);
782+
let store = self.storage.get_object_store();
783+
let res: Vec<Schema> = store
784+
.get_objects(
785+
Some(&path_prefix),
786+
Box::new(|file_name: String| file_name.contains(".schema")),
787+
)
788+
.await?
789+
.iter()
790+
// we should be able to unwrap as we know the data is valid schema
791+
.map(|byte_obj| serde_json::from_slice(byte_obj).expect("data is valid json"))
792+
.collect_vec();
793+
794+
let new_schema = Schema::try_merge(res)?;
795+
Ok(new_schema)
796+
}
768797
}
769798

770799
pub fn validate_static_schema(

0 commit comments

Comments
 (0)