Skip to content

Commit d096ce0

Browse files
author
Devdutt Shenoi
committed
perf: partition at json level
1 parent 5bbd309 commit d096ce0

File tree

4 files changed

+100
-152
lines changed

4 files changed

+100
-152
lines changed

src/event/format/json.rs

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use anyhow::anyhow;
2323
use arrow_array::RecordBatch;
2424
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
2525
use arrow_schema::{DataType, Field, Fields, Schema};
26-
use chrono::{DateTime, NaiveDateTime, Utc};
26+
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
2727
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
2828
use itertools::Itertools;
2929
use opentelemetry_proto::tonic::{
@@ -47,9 +47,17 @@ use crate::{
4747
utils::{
4848
arrow::get_field,
4949
json::{flatten_json_body, Json},
50+
time::Minute,
5051
},
52+
OBJECT_STORE_DATA_GRANULARITY,
5153
};
5254

55+
struct JsonPartition {
56+
batch: Vec<Json>,
57+
schema: Vec<Arc<Field>>,
58+
date: NaiveDate,
59+
}
60+
5361
pub struct Event {
5462
pub json: Value,
5563
pub origin_size: usize,
@@ -248,7 +256,7 @@ impl EventFormat for Event {
248256
)?;
249257

250258
let mut is_first_event = false;
251-
let mut partitions = HashMap::new();
259+
let mut json_partitions = HashMap::new();
252260
for json in data {
253261
let (schema, is_first) = Self::infer_schema(
254262
&json,
@@ -257,6 +265,7 @@ impl EventFormat for Event {
257265
static_schema_flag,
258266
schema_version,
259267
)?;
268+
260269
is_first_event = is_first_event || is_first;
261270
let custom_partition_values = match custom_partitions.as_ref() {
262271
Some(custom_partitions) => {
@@ -267,45 +276,60 @@ impl EventFormat for Event {
267276
};
268277

269278
let parsed_timestamp = match time_partition.as_ref() {
270-
Some(time_partition) => extract_and_parse_time(&json, time_partition.as_ref())?,
279+
Some(time_partition) => extract_and_parse_time(&json, time_partition)?,
271280
_ => p_timestamp.naive_utc(),
272281
};
273282

274-
let batch = Self::into_recordbatch(
275-
p_timestamp,
276-
&[json],
277-
&schema,
278-
time_partition.as_ref(),
279-
schema_version,
280-
)?;
281-
282-
let schema = batch.schema();
283-
let mut key = get_schema_key(&schema.fields);
284-
if time_partition.is_some() {
285-
let parsed_timestamp_to_min = parsed_timestamp.format("%Y%m%dT%H%M").to_string();
286-
key.push_str(&parsed_timestamp_to_min);
287-
}
288-
289-
for (k, v) in custom_partition_values.iter().sorted_by_key(|v| v.0) {
290-
key.push_str(&format!("&{k}={v}"));
291-
}
283+
let prefix = format!(
284+
"{}.{}.minute={}.{}",
285+
get_schema_key(&schema),
286+
parsed_timestamp.format("date=%Y-%m-%d.hour=%H"),
287+
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
288+
custom_partition_values
289+
.iter()
290+
.sorted_by_key(|v| v.0)
291+
.map(|(key, value)| format!("{key}={value}."))
292+
.join("")
293+
);
292294

293-
match partitions.get_mut(&key) {
294-
Some(PartitionEvent { rbs, .. }) => rbs.push(batch),
295+
match json_partitions.get_mut(&prefix) {
296+
Some(JsonPartition { batch, .. }) => batch.push(json),
295297
_ => {
296-
partitions.insert(
297-
key,
298-
PartitionEvent {
299-
rbs: vec![batch],
298+
let date = parsed_timestamp.date();
299+
let batch = vec![json];
300+
json_partitions.insert(
301+
prefix,
302+
JsonPartition {
303+
batch,
300304
schema,
301-
parsed_timestamp,
302-
custom_partition_values,
305+
date,
303306
},
304307
);
305308
}
306309
}
307310
}
308311

312+
let mut partitions = HashMap::new();
313+
for (
314+
prefix,
315+
JsonPartition {
316+
batch,
317+
schema,
318+
date,
319+
},
320+
) in json_partitions
321+
{
322+
let batch = Self::into_recordbatch(
323+
p_timestamp,
324+
&batch,
325+
&schema,
326+
time_partition.as_ref(),
327+
schema_version,
328+
)?;
329+
330+
partitions.insert(prefix, PartitionEvent { rb: batch, date });
331+
}
332+
309333
Ok(super::Event {
310334
origin_format: "json",
311335
origin_size,

src/event/mod.rs

Lines changed: 17 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,21 @@
2020
pub mod format;
2121

2222
use arrow_array::RecordBatch;
23-
use arrow_schema::{Field, Schema};
23+
use arrow_schema::Field;
2424
use itertools::Itertools;
2525
use std::sync::Arc;
2626

2727
use self::error::EventError;
2828
use crate::{metadata::update_stats, parseable::Stream, storage::StreamType};
29-
use chrono::NaiveDateTime;
29+
use chrono::NaiveDate;
3030
use std::collections::HashMap;
3131

3232
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3333

3434
#[derive(Debug)]
3535
pub struct PartitionEvent {
36-
pub rbs: Vec<RecordBatch>,
37-
pub schema: Arc<Schema>,
38-
pub parsed_timestamp: NaiveDateTime,
39-
pub custom_partition_values: HashMap<String, String>,
36+
pub rb: RecordBatch,
37+
pub date: NaiveDate,
4038
}
4139

4240
#[derive(Debug)]
@@ -52,44 +50,28 @@ pub struct Event {
5250
// Events holds the schema related to a each event for a single log stream
5351
impl Event {
5452
pub fn process(self, stream: &Stream) -> Result<(), EventError> {
55-
for (key, partition) in self.partitions {
53+
for (prefix, PartitionEvent { rb, date }) in self.partitions {
5654
if self.is_first_event {
57-
stream.commit_schema(partition.schema.as_ref().clone())?;
55+
stream.commit_schema(rb.schema().as_ref().clone())?;
5856
}
59-
for rb in partition.rbs {
60-
stream.push(
61-
&key,
62-
&rb,
63-
partition.parsed_timestamp,
64-
&partition.custom_partition_values,
65-
self.stream_type,
66-
)?;
57+
stream.push(&prefix, &rb, self.stream_type)?;
6758

68-
update_stats(
69-
&stream.stream_name,
70-
self.origin_format,
71-
self.origin_size,
72-
rb.num_rows(),
73-
partition.parsed_timestamp.date(),
74-
);
59+
update_stats(
60+
&stream.stream_name,
61+
self.origin_format,
62+
self.origin_size,
63+
rb.num_rows(),
64+
date,
65+
);
7566

76-
crate::livetail::LIVETAIL.process(&stream.stream_name, &rb);
77-
}
67+
crate::livetail::LIVETAIL.process(&stream.stream_name, &rb);
7868
}
7969
Ok(())
8070
}
8171

8272
pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> {
83-
for (key, partition) in &self.partitions {
84-
for rb in &partition.rbs {
85-
stream.push(
86-
key,
87-
rb,
88-
partition.parsed_timestamp,
89-
&partition.custom_partition_values,
90-
self.stream_type,
91-
)?;
92-
}
73+
for (prefix, partition) in &self.partitions {
74+
stream.push(prefix, &partition.rb, self.stream_type)?;
9375
}
9476

9577
Ok(())

src/handlers/http/ingest.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*
1717
*/
1818

19-
use std::collections::HashMap;
20-
2119
use actix_web::web::Path;
2220
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
2321
use arrow_array::RecordBatch;
@@ -236,22 +234,19 @@ pub async fn post_event(
236234
}
237235

238236
pub async fn push_logs_unchecked(
239-
batch: RecordBatch,
237+
rb: RecordBatch,
240238
stream: &Stream,
241239
) -> Result<event::Event, PostError> {
242-
let schema = batch.schema();
243240
let unchecked_event = event::Event {
244241
origin_format: "json",
245242
origin_size: 0,
246243
time_partition: None,
247244
is_first_event: true, // NOTE: Maybe should be false
248245
partitions: [(
249-
get_schema_key(&schema.fields),
246+
get_schema_key(&rb.schema().fields),
250247
PartitionEvent {
251-
rbs: vec![batch],
252-
schema,
253-
parsed_timestamp: Utc::now().naive_utc(),
254-
custom_partition_values: HashMap::new(), // should be an empty map for unchecked push
248+
rb,
249+
date: Utc::now().date_naive(),
255250
},
256251
)]
257252
.into_iter()

0 commit comments

Comments
 (0)