Skip to content

Commit 4e26ba5

Browse files
author
Devdutt Shenoi
committed
refactor: associate Stream::commit_schema
1 parent 426397f commit 4e26ba5

File tree

3 files changed

+33
-37
lines changed

3 files changed

+33
-37
lines changed

src/event/mod.rs

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,17 @@
1717
*
1818
*/
1919

20-
pub mod format;
20+
use std::{collections::HashMap, sync::Arc};
2121

2222
use arrow_array::RecordBatch;
23-
use arrow_schema::{Field, Fields, Schema};
24-
use itertools::Itertools;
25-
use std::sync::Arc;
26-
27-
use self::error::EventError;
28-
use crate::{
29-
metadata::update_stats,
30-
parseable::{StagingError, PARSEABLE},
31-
storage::StreamType,
32-
LOCK_EXPECT,
33-
};
23+
use arrow_schema::Field;
3424
use chrono::NaiveDateTime;
35-
use std::collections::HashMap;
25+
use error::EventError;
26+
use itertools::Itertools;
27+
28+
use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType};
29+
30+
pub mod format;
3631

3732
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3833
pub const USER_AGENT_KEY: &str = "p_user_agent";
@@ -67,11 +62,12 @@ impl Event {
6762
}
6863
}
6964

65+
let stream = PARSEABLE.get_or_create_stream(&self.stream_name);
7066
if self.is_first_event {
71-
commit_schema(&self.stream_name, self.rb.schema())?;
67+
stream.commit_schema(self.rb.schema())?;
7268
}
7369

74-
PARSEABLE.get_or_create_stream(&self.stream_name).push(
70+
stream.push(
7571
&key,
7672
&self.rb,
7773
self.parsed_timestamp,
@@ -117,32 +113,20 @@ pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
117113
format!("{hash:x}")
118114
}
119115

120-
pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), StagingError> {
121-
let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned");
122-
123-
let map = &mut stream_metadata
124-
.get_mut(stream_name)
125-
.expect("map has entry for this stream name")
126-
.metadata
127-
.write()
128-
.expect(LOCK_EXPECT)
129-
.schema;
130-
let current_schema = Schema::new(map.values().cloned().collect::<Fields>());
131-
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
132-
map.clear();
133-
map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
134-
Ok(())
135-
}
136-
137116
pub mod error {
138117

139-
use crate::{parseable::StagingError, storage::ObjectStorageError};
118+
use crate::{
119+
parseable::{StagingError, StreamNotFound},
120+
storage::ObjectStorageError,
121+
};
140122

141123
#[derive(Debug, thiserror::Error)]
142124
pub enum EventError {
143125
#[error("Staging Failed: {0}")]
144126
Staging(#[from] StagingError),
145127
#[error("ObjectStorage Error: {0}")]
146128
ObjectStorage(#[from] ObjectStorageError),
129+
#[error("{0}")]
130+
NotFound(#[from] StreamNotFound),
147131
}
148132
}

src/handlers/http/query.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ use tracing::error;
3535

3636
use crate::event::error::EventError;
3737
use crate::handlers::http::fetch_schema;
38-
39-
use crate::event::commit_schema;
4038
use crate::metrics::QUERY_EXECUTE_TIME;
4139
use crate::option::Mode;
4240
use crate::parseable::{StreamNotFound, PARSEABLE};
@@ -178,7 +176,9 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
178176
.commit_schema(table, new_schema.clone())
179177
.await?;
180178

181-
commit_schema(table, Arc::new(new_schema))?;
179+
PARSEABLE
180+
.get_stream(table)?
181+
.commit_schema(Arc::new(new_schema))?;
182182
}
183183
}
184184
}

src/parseable/streams.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::{
2828
};
2929

3030
use arrow_array::RecordBatch;
31-
use arrow_schema::{Field, Fields, Schema};
31+
use arrow_schema::{Field, Fields, Schema, SchemaRef};
3232
use chrono::{NaiveDateTime, Timelike, Utc};
3333
use derive_more::{Deref, DerefMut};
3434
use itertools::Itertools;
@@ -591,6 +591,18 @@ impl Stream {
591591
self.metadata.read().expect(LOCK_EXPECT).schema_version
592592
}
593593

594+
/// Stores updated schema in-memory
595+
pub fn commit_schema(&self, schema: SchemaRef) -> Result<(), StagingError> {
596+
let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
597+
let current_schema = Schema::new(metadata.schema.values().cloned().collect::<Fields>());
598+
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
599+
metadata.schema.clear();
600+
metadata
601+
.schema
602+
.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
603+
Ok(())
604+
}
605+
594606
pub fn get_schema(&self) -> Arc<Schema> {
595607
let metadata = self.metadata.read().expect(LOCK_EXPECT);
596608

0 commit comments

Comments
 (0)