From 8ad7eb4f12afc1bf32b5f7e92aac01929abe8d13 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 5 Jul 2025 22:54:22 -0700 Subject: [PATCH 1/3] chore: simplify schema creation from storage remove functions that creates schema from ingestors and queriers separately reused function `fetch_schema` that fetches all schema files and merges the schemas into one this ensures the schema is always the latest --- src/migration/mod.rs | 25 +------------------ src/parseable/mod.rs | 2 +- src/storage/object_storage.rs | 45 ++++++++--------------------------- 3 files changed, 12 insertions(+), 60 deletions(-) diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 4216f1076..e41b11117 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -206,7 +206,7 @@ async fn migration_stream( ) -> anyhow::Result> { let mut arrow_schema: Schema = Schema::empty(); - let schema = fetch_or_create_schema(stream, storage).await?; + let schema = storage.create_schema_from_storage(stream).await?; let stream_metadata = fetch_or_create_stream_metadata(stream, storage).await?; let mut stream_meta_found = true; @@ -234,29 +234,6 @@ async fn migration_stream( Ok(Some(metadata)) } -async fn fetch_or_create_schema( - stream: &str, - storage: &dyn ObjectStorage, -) -> anyhow::Result { - let schema_path = schema_path(stream); - if let Ok(schema) = storage.get_object(&schema_path).await { - Ok(schema) - } else { - let querier_schema = storage - .create_schema_from_querier(stream) - .await - .unwrap_or_default(); - if !querier_schema.is_empty() { - Ok(querier_schema) - } else { - Ok(storage - .create_schema_from_ingestor(stream) - .await - .unwrap_or_default()) - } - } -} - async fn fetch_or_create_stream_metadata( stream: &str, storage: &dyn ObjectStorage, diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 87496cfc1..e1f09ee23 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -281,7 +281,7 @@ impl Parseable { let (stream_metadata_bytes, schema_bytes) = try_join!( storage.create_stream_from_ingestor(stream_name), - storage.create_schema_from_ingestor(stream_name) + storage.create_schema_from_storage(stream_name) )?; let stream_metadata = if stream_metadata_bytes.is_empty() { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 0289995c3..ee385b7c1 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -49,6 +49,7 @@ use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; use crate::event::format::LogSourceEntry; +use crate::handlers::http::fetch_schema; use crate::handlers::http::modal::ingest_server::INGESTOR_EXPECT; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::CORRELATION_DIR; @@ -652,44 +653,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(Bytes::new()) } - ///create schema from querier schema from storage - async fn create_schema_from_querier( + ///create schema from storage + async fn create_schema_from_storage( &self, stream_name: &str, ) -> Result { - let path = - RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - if let Ok(querier_schema_bytes) = self.get_object(&path).await { - self.put_object(&schema_path(stream_name), querier_schema_bytes.clone()) - .await?; - return Ok(querier_schema_bytes); - } - Ok(Bytes::new()) - } - - ///create schema from ingestor schema from storage - async fn create_schema_from_ingestor( - &self, - stream_name: &str, - ) -> Result { - let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); - if let Some(schema_obs) = self - .get_objects( - Some(&path), - Box::new(|file_name| { - file_name.starts_with(".ingestor") && file_name.ends_with("schema") - }), - ) - .await - .into_iter() - .next() - { - let schema_ob = &schema_obs[0]; - self.put_object(&schema_path(stream_name), schema_ob.clone()) - .await?; - return Ok(schema_ob.clone()); - } - Ok(Bytes::new()) + let schema = fetch_schema(stream_name).await?; + // convert to bytes + let schema = serde_json::to_vec(&schema)?; + let schema_bytes = Bytes::from(schema); + self.put_object(&schema_path(stream_name), schema_bytes.clone()) + .await?; + Ok(schema_bytes) } async fn get_stream_meta_from_storage( From a4c8f788520bb7b9ec6e8cca2a16b4bace8ab27c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 16 Jul 2025 07:46:10 -0700 Subject: [PATCH 2/3] create stream for empty fields stream in querier --- src/handlers/http/query.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 1a6f664c2..688fb5c68 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -439,12 +439,22 @@ pub async fn create_streams_for_querier() -> Result<(), QueryError> { let querier_streams = PARSEABLE.streams.list(); let querier_streams_set: HashSet<_> = querier_streams.into_iter().collect(); - - let storage_streams = store.list_streams().await?; - - let missing_streams: Vec<_> = storage_streams + // fetch querier streams which have field list blank + // now missing streams should be list of streams which are in storage but not in querier + // and also have no fields in the schema + // this is to ensure that we do not create streams for querier which already exist in querier + + let missing_streams: Vec<_> = store + .list_streams() + .await? .into_iter() - .filter(|stream_name| !querier_streams_set.contains(stream_name)) + .filter(|stream_name| { + !querier_streams_set.contains(stream_name) + && PARSEABLE + .get_stream(stream_name) + .map(|s| s.get_schema().fields().is_empty()) + .unwrap_or(true) + }) .collect(); if missing_streams.is_empty() { From 42cebeccbc931ec8f57243b64a657e9e07273cae Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Wed, 16 Jul 2025 08:13:25 -0700 Subject: [PATCH 3/3] correct logic --- src/handlers/http/query.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 688fb5c68..85634d031 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -450,10 +450,10 @@ pub async fn create_streams_for_querier() -> Result<(), QueryError> { .into_iter() .filter(|stream_name| { !querier_streams_set.contains(stream_name) - && PARSEABLE + || PARSEABLE .get_stream(stream_name) .map(|s| s.get_schema().fields().is_empty()) - .unwrap_or(true) + .unwrap_or(false) }) .collect();