Skip to content

Commit dc34a85

Browse files
author
Devdutt Shenoi
committed
refactor: share Stream state when processing
1 parent a7b2db3 commit dc34a85

File tree

7 files changed

+37
-66
lines changed

7 files changed

+37
-66
lines changed

src/event/format/json.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::{
4343
kinesis::{flatten_kinesis_logs, Message},
4444
metadata::SchemaVersion,
4545
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
46-
storage::StreamType,
46+
parseable::Stream,
4747
utils::{
4848
arrow::get_field,
4949
json::{flatten_json_body, Json},
@@ -224,23 +224,24 @@ impl EventFormat for Event {
224224
/// Converts a JSON event into a Parseable Event
225225
fn into_event(
226226
self,
227-
stream_name: String,
228227
origin_size: u64,
229-
storage_schema: &HashMap<String, Arc<Field>>,
230-
static_schema_flag: bool,
231-
custom_partitions: Option<&String>,
232-
time_partition: Option<&String>,
233-
time_partition_limit: Option<NonZeroU32>,
234-
schema_version: SchemaVersion,
228+
stream: &Stream,
235229
log_source: &LogSource,
236-
stream_type: StreamType,
237230
) -> anyhow::Result<super::Event> {
231+
let time_partition = stream.get_time_partition();
232+
let time_partition_limit = stream.get_time_partition_limit();
233+
let static_schema_flag = stream.get_static_schema_flag();
234+
let custom_partitions = stream.get_custom_partition();
235+
let schema_version = stream.get_schema_version();
236+
let storage_schema = stream.get_schema_raw();
237+
let stream_type = stream.get_stream_type();
238+
238239
let p_timestamp = self.p_timestamp;
239240
let (data, schema, is_first_event) = self.to_data(
240-
storage_schema,
241-
time_partition,
241+
&storage_schema,
242+
time_partition.as_ref(),
242243
time_partition_limit,
243-
custom_partitions,
244+
custom_partitions.as_ref(),
244245
schema_version,
245246
log_source,
246247
)?;
@@ -255,18 +256,18 @@ impl EventFormat for Event {
255256
None => HashMap::new(),
256257
};
257258

258-
let parsed_timestamp = match time_partition {
259-
Some(time_partition) => extract_and_parse_time(&json, time_partition)?,
259+
let parsed_timestamp = match time_partition.as_ref() {
260+
Some(time_partition) => extract_and_parse_time(&json, time_partition.as_ref())?,
260261
_ => p_timestamp.naive_utc(),
261262
};
262263

263264
let rb = Self::into_recordbatch(
264265
p_timestamp,
265266
vec![json],
266267
schema.clone(),
267-
storage_schema,
268+
&storage_schema,
268269
static_schema_flag,
269-
time_partition,
270+
time_partition.as_ref(),
270271
schema_version,
271272
)?;
272273

@@ -278,7 +279,6 @@ impl EventFormat for Event {
278279
}
279280

280281
Ok(super::Event {
281-
stream_name,
282282
origin_format: "json",
283283
origin_size,
284284
is_first_event,

src/event/format/mod.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use serde_json::Value;
3333

3434
use crate::{
3535
metadata::SchemaVersion,
36-
storage::StreamType,
36+
parseable::Stream,
3737
utils::{
3838
arrow::{get_field, get_timestamp_array, replace_columns},
3939
json::Json,
@@ -165,19 +165,11 @@ pub trait EventFormat: Sized {
165165
Ok(rb)
166166
}
167167

168-
#[allow(clippy::too_many_arguments)]
169168
fn into_event(
170169
self,
171-
stream_name: String,
172170
origin_size: u64,
173-
storage_schema: &HashMap<String, Arc<Field>>,
174-
static_schema_flag: bool,
175-
custom_partitions: Option<&String>,
176-
time_partition: Option<&String>,
177-
time_partition_limit: Option<NonZeroU32>,
178-
schema_version: SchemaVersion,
171+
stream: &Stream,
179172
log_source: &LogSource,
180-
stream_type: StreamType,
181173
) -> Result<Event, AnyError>;
182174
}
183175

src/event/mod.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::sync::Arc;
2727
use self::error::EventError;
2828
use crate::{
2929
metadata::update_stats,
30-
parseable::{StagingError, PARSEABLE},
30+
parseable::{StagingError, Stream, PARSEABLE},
3131
storage::StreamType,
3232
LOCK_EXPECT,
3333
};
@@ -43,7 +43,6 @@ pub struct PartitionEvent {
4343
}
4444

4545
pub struct Event {
46-
pub stream_name: String,
4746
pub origin_format: &'static str,
4847
pub origin_size: u64,
4948
pub is_first_event: bool,
@@ -54,7 +53,7 @@ pub struct Event {
5453

5554
// Events holds the schema related to a each event for a single log stream
5655
impl Event {
57-
pub fn process(self) -> Result<(), EventError> {
56+
pub fn process(self, stream: &Stream) -> Result<(), EventError> {
5857
for partition in self.partitions {
5958
let mut key = get_schema_key(&partition.rb.schema().fields);
6059
if self.time_partition.is_some() {
@@ -72,10 +71,10 @@ impl Event {
7271
}
7372

7473
if self.is_first_event {
75-
commit_schema(&self.stream_name, partition.rb.schema())?;
74+
commit_schema(&stream.stream_name, partition.rb.schema())?;
7675
}
7776

78-
PARSEABLE.get_or_create_stream(&self.stream_name).push(
77+
stream.push(
7978
&key,
8079
&partition.rb,
8180
partition.parsed_timestamp,
@@ -84,23 +83,23 @@ impl Event {
8483
)?;
8584

8685
update_stats(
87-
&self.stream_name,
86+
&stream.stream_name,
8887
self.origin_format,
8988
self.origin_size,
9089
partition.rb.num_rows(),
9190
partition.parsed_timestamp.date(),
9291
);
9392

94-
crate::livetail::LIVETAIL.process(&self.stream_name, &partition.rb);
93+
crate::livetail::LIVETAIL.process(&stream.stream_name, &partition.rb);
9594
}
9695
Ok(())
9796
}
9897

99-
pub fn process_unchecked(&self) -> Result<(), EventError> {
98+
pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> {
10099
for partition in &self.partitions {
101100
let key = get_schema_key(&partition.rb.schema().fields);
102101

103-
PARSEABLE.get_or_create_stream(&self.stream_name).push(
102+
stream.push(
104103
&key,
105104
&partition.rb,
106105
partition.parsed_timestamp,

src/handlers/http/ingest.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::event::format::LogSource;
3131
use crate::event::{self, PartitionEvent};
3232
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3333
use crate::option::Mode;
34-
use crate::parseable::{StreamNotFound, PARSEABLE};
34+
use crate::parseable::{Stream, StreamNotFound, PARSEABLE};
3535
use crate::storage::{ObjectStorageError, StreamType};
3636
use crate::utils::header_parsing::ParseHeaderError;
3737
use crate::utils::json::flatten::JsonFlattenError;
@@ -240,10 +240,9 @@ pub async fn post_event(
240240

241241
pub async fn push_logs_unchecked(
242242
batches: RecordBatch,
243-
stream_name: &str,
243+
stream: &Stream,
244244
) -> Result<event::Event, PostError> {
245245
let unchecked_event = event::Event {
246-
stream_name: stream_name.to_string(),
247246
origin_format: "json",
248247
origin_size: 0,
249248
time_partition: None,
@@ -255,7 +254,7 @@ pub async fn push_logs_unchecked(
255254
}],
256255
stream_type: StreamType::UserDefined,
257256
};
258-
unchecked_event.process_unchecked()?;
257+
unchecked_event.process_unchecked(stream)?;
259258

260259
Ok(unchecked_event)
261260
}

src/parseable/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode};
2828
use once_cell::sync::Lazy;
2929
pub use staging::StagingError;
3030
use streams::StreamRef;
31-
pub use streams::{StreamNotFound, Streams};
31+
pub use streams::{Stream, StreamNotFound, Streams};
3232
use tracing::error;
3333

3434
#[cfg(feature = "kafka")]

src/parseable/streams.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -114,30 +114,11 @@ impl Stream {
114114
}
115115

116116
pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> anyhow::Result<()> {
117-
let time_partition = self.get_time_partition();
118-
let time_partition_limit = self.get_time_partition_limit();
119-
let static_schema_flag = self.get_static_schema_flag();
120-
let custom_partition = self.get_custom_partition();
121-
let schema_version = self.get_schema_version();
122-
let schema = self.get_schema_raw();
123-
let stream_type = self.get_stream_type();
124-
125117
let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
126118

127119
json::Event::new(json)
128-
.into_event(
129-
self.stream_name.to_owned(),
130-
origin_size,
131-
&schema,
132-
static_schema_flag,
133-
custom_partition.as_ref(),
134-
time_partition.as_ref(),
135-
time_partition_limit,
136-
schema_version,
137-
log_source,
138-
stream_type,
139-
)?
140-
.process()?;
120+
.into_event(origin_size, self, log_source)?
121+
.process(self)?;
141122

142123
Ok(())
143124
}

src/utils/arrow/flight.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,14 @@ pub async fn append_temporary_events(
9595
Event,
9696
Status,
9797
> {
98-
let schema = PARSEABLE
98+
let stream = PARSEABLE
9999
.get_stream(stream_name)
100-
.map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?
101-
.get_schema();
100+
.map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?;
101+
let schema = stream.get_schema();
102102
let rb = concat_batches(&schema, minute_result)
103103
.map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?;
104104

105-
let event = push_logs_unchecked(rb, stream_name)
105+
let event = push_logs_unchecked(rb, &stream)
106106
.await
107107
.map_err(|err| Status::internal(err.to_string()))?;
108108
Ok(event)

0 commit comments

Comments
 (0)