Skip to content

Commit 7215e8e

Browse files
author
Devdutt Shenoi
committed
map schema keys to recordbatches
1 parent 19708df commit 7215e8e

File tree

3 files changed

+38
-36
lines changed

3 files changed

+38
-36
lines changed

src/event/format/json.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use tracing::error;
3939

4040
use super::{EventFormat, LogSource};
4141
use crate::{
42-
event::PartitionEvent,
42+
event::{get_schema_key, PartitionEvent},
4343
kinesis::{flatten_kinesis_logs, Message},
4444
metadata::SchemaVersion,
4545
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
@@ -246,7 +246,7 @@ impl EventFormat for Event {
246246
log_source,
247247
)?;
248248

249-
let mut partitions = vec![];
249+
let mut partitions = HashMap::new();
250250
for json in data {
251251
let custom_partition_values = match custom_partitions.as_ref() {
252252
Some(custom_partitions) => {
@@ -271,11 +271,25 @@ impl EventFormat for Event {
271271
schema_version,
272272
)?;
273273

274-
partitions.push(PartitionEvent {
275-
rb,
276-
parsed_timestamp,
277-
custom_partition_values,
278-
});
274+
let schema = rb.schema();
275+
let mut key = get_schema_key(&schema.fields);
276+
if time_partition.is_some() {
277+
let parsed_timestamp_to_min = parsed_timestamp.format("%Y%m%dT%H%M").to_string();
278+
key.push_str(&parsed_timestamp_to_min);
279+
}
280+
281+
for (k, v) in custom_partition_values.iter().sorted_by_key(|v| v.0) {
282+
key.push_str(&format!("&{k}={v}"));
283+
}
284+
285+
partitions.insert(
286+
key,
287+
PartitionEvent {
288+
rb,
289+
parsed_timestamp,
290+
custom_partition_values,
291+
},
292+
);
279293
}
280294

281295
Ok(super::Event {

src/event/mod.rs

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,29 +42,14 @@ pub struct Event {
4242
pub origin_size: u64,
4343
pub is_first_event: bool,
4444
pub time_partition: Option<String>,
45-
pub partitions: Vec<PartitionEvent>,
45+
pub partitions: HashMap<String, PartitionEvent>,
4646
pub stream_type: StreamType,
4747
}
4848

4949
// Events holds the schema related to a each event for a single log stream
5050
impl Event {
5151
pub fn process(self, stream: &Stream) -> Result<(), EventError> {
52-
for partition in self.partitions {
53-
let mut key = get_schema_key(&partition.rb.schema().fields);
54-
if self.time_partition.is_some() {
55-
let parsed_timestamp_to_min =
56-
partition.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
57-
key.push_str(&parsed_timestamp_to_min);
58-
}
59-
60-
for (k, v) in partition
61-
.custom_partition_values
62-
.iter()
63-
.sorted_by_key(|v| v.0)
64-
{
65-
key.push_str(&format!("&{k}={v}"));
66-
}
67-
52+
for (key, partition) in self.partitions {
6853
if self.is_first_event {
6954
let schema = partition.rb.schema().as_ref().clone();
7055
stream.commit_schema(schema)?;
@@ -92,11 +77,9 @@ impl Event {
9277
}
9378

9479
pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> {
95-
for partition in &self.partitions {
96-
let key = get_schema_key(&partition.rb.schema().fields);
97-
80+
for (key, partition) in &self.partitions {
9881
stream.push(
99-
&key,
82+
key,
10083
&partition.rb,
10184
partition.parsed_timestamp,
10285
&partition.custom_partition_values,

src/handlers/http/ingest.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use serde_json::Value;
2828

2929
use crate::event::error::EventError;
3030
use crate::event::format::LogSource;
31-
use crate::event::{self, PartitionEvent};
31+
use crate::event::{self, get_schema_key, PartitionEvent};
3232
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3333
use crate::option::Mode;
3434
use crate::parseable::{Stream, StreamNotFound, PARSEABLE};
@@ -239,21 +239,26 @@ pub async fn post_event(
239239
}
240240

241241
pub async fn push_logs_unchecked(
242-
batches: RecordBatch,
242+
rb: RecordBatch,
243243
stream: &Stream,
244244
) -> Result<event::Event, PostError> {
245-
let unchecked_event = event::Event {
245+
let mut unchecked_event = event::Event {
246246
origin_format: "json",
247247
origin_size: 0,
248248
time_partition: None,
249249
is_first_event: true, // NOTE: Maybe should be false
250-
partitions: vec![PartitionEvent {
251-
rb: batches,
252-
parsed_timestamp: Utc::now().naive_utc(),
253-
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
254-
}],
250+
partitions: HashMap::new(),
255251
stream_type: StreamType::UserDefined,
256252
};
253+
unchecked_event.partitions.insert(
254+
get_schema_key(&rb.schema().fields),
255+
PartitionEvent {
256+
rb,
257+
parsed_timestamp: Utc::now().naive_utc(),
258+
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
259+
},
260+
);
261+
257262
unchecked_event.process_unchecked(stream)?;
258263

259264
Ok(unchecked_event)

0 commit comments

Comments
 (0)