Skip to content

Commit 10ec3ba

Browse files
author
Devdutt Shenoi
committed
refactor: json events
1 parent 245ec54 commit 10ec3ba

File tree

4 files changed

+62
-79
lines changed

4 files changed

+62
-79
lines changed

src/connectors/kafka/processor.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,20 @@ impl ParseableSinkProcessor {
6161
let static_schema_flag = stream.get_static_schema_flag();
6262
let schema_version = stream.get_schema_version();
6363

64-
let (json_vec, total_payload_size) = Self::json_vec(records);
65-
let batch_json_event = json::Event {
66-
data: Value::Array(json_vec),
67-
};
64+
let mut json_vec = Vec::with_capacity(records.len());
65+
let mut total_payload_size = 0u64;
66+
67+
for record in records.iter().filter_map(|r| r.payload.as_ref()) {
68+
total_payload_size += record.len() as u64;
69+
if let Ok(value) = serde_json::from_slice::<Value>(record) {
70+
json_vec.push(value);
71+
}
72+
}
6873

69-
let (rb, is_first) = batch_json_event.into_recordbatch(
74+
let (rb, is_first) = json::Event {
75+
json: Value::Array(json_vec),
76+
}
77+
.into_recordbatch(
7078
&schema,
7179
static_schema_flag,
7280
time_partition.as_ref(),
@@ -87,31 +95,17 @@ impl ParseableSinkProcessor {
8795

8896
Ok(p_event)
8997
}
90-
91-
fn json_vec(records: &[ConsumerRecord]) -> (Vec<Value>, u64) {
92-
let mut json_vec = Vec::with_capacity(records.len());
93-
let mut total_payload_size = 0u64;
94-
95-
for record in records.iter().filter_map(|r| r.payload.as_ref()) {
96-
total_payload_size += record.len() as u64;
97-
if let Ok(value) = serde_json::from_slice::<Value>(record) {
98-
json_vec.push(value);
99-
}
100-
}
101-
102-
(json_vec, total_payload_size)
103-
}
10498
}
10599

106100
#[async_trait]
107101
impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
108102
async fn process(&self, records: Vec<ConsumerRecord>) -> anyhow::Result<()> {
109103
let len = records.len();
110-
debug!("Processing {} records", len);
104+
debug!("Processing {len} records");
111105

112106
self.build_event_from_chunk(&records).await?.process()?;
113107

114-
debug!("Processed {} records", len);
108+
debug!("Processed {len} records");
115109
Ok(())
116110
}
117111
}

src/event/format/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use super::EventFormat;
3333
use crate::{metadata::SchemaVersion, utils::arrow::get_field};
3434

3535
pub struct Event {
36-
pub data: Value,
36+
pub json: Value,
3737
}
3838

3939
impl EventFormat for Event {
@@ -52,7 +52,7 @@ impl EventFormat for Event {
5252
// incoming event may be a single json or a json array
5353
// but Data (type defined above) is a vector of json values
5454
// hence we need to convert the incoming event to a vector of json values
55-
let value_arr = match self.data {
55+
let value_arr = match self.json {
5656
Value::Array(arr) => arr,
5757
value @ Value::Object(_) => vec![value],
5858
_ => unreachable!("flatten would have failed beforehand"),

src/handlers/http/ingest.rs

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
8181
let size: usize = body.len();
8282
let parsed_timestamp = Utc::now().naive_utc();
8383
let (rb, is_first) = {
84-
let body_val: Value = serde_json::from_slice(&body)?;
84+
let json: Value = serde_json::from_slice(&body)?;
8585
let hash_map = PARSEABLE.streams.read().unwrap();
8686
let schema = hash_map
8787
.get(&stream_name)
@@ -91,7 +91,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9191
.expect(LOCK_EXPECT)
9292
.schema
9393
.clone();
94-
let event = format::json::Event { data: body_val };
94+
let event = format::json::Event { json };
9595
// For internal streams, use old schema
9696
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
9797
};
@@ -355,7 +355,7 @@ mod tests {
355355
use std::{collections::HashMap, sync::Arc};
356356

357357
use crate::{
358-
handlers::http::modal::utils::ingest_utils::into_event_batch,
358+
event::format::{json, EventFormat},
359359
metadata::SchemaVersion,
360360
utils::json::{convert_array_to_object, flatten::convert_to_array},
361361
};
@@ -392,8 +392,9 @@ mod tests {
392392
"b": "hello",
393393
});
394394

395-
let (rb, _) =
396-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
395+
let (rb, _) = json::Event { json }
396+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
397+
.unwrap();
397398

398399
assert_eq!(rb.num_rows(), 1);
399400
assert_eq!(rb.num_columns(), 4);
@@ -419,8 +420,9 @@ mod tests {
419420
"c": null
420421
});
421422

422-
let (rb, _) =
423-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
423+
let (rb, _) = json::Event { json }
424+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
425+
.unwrap();
424426

425427
assert_eq!(rb.num_rows(), 1);
426428
assert_eq!(rb.num_columns(), 3);
@@ -450,7 +452,9 @@ mod tests {
450452
.into_iter(),
451453
);
452454

453-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
455+
let (rb, _) = json::Event { json }
456+
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
457+
.unwrap();
454458

455459
assert_eq!(rb.num_rows(), 1);
456460
assert_eq!(rb.num_columns(), 3);
@@ -480,7 +484,9 @@ mod tests {
480484
.into_iter(),
481485
);
482486

483-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
487+
assert!(json::Event { json }
488+
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
489+
.is_err());
484490
}
485491

486492
#[test]
@@ -496,7 +502,9 @@ mod tests {
496502
.into_iter(),
497503
);
498504

499-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
505+
let (rb, _) = json::Event { json }
506+
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
507+
.unwrap();
500508

501509
assert_eq!(rb.num_rows(), 1);
502510
assert_eq!(rb.num_columns(), 1);
@@ -535,8 +543,9 @@ mod tests {
535543
},
536544
]);
537545

538-
let (rb, _) =
539-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
546+
let (rb, _) = json::Event { json }
547+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
548+
.unwrap();
540549

541550
assert_eq!(rb.num_rows(), 3);
542551
assert_eq!(rb.num_columns(), 4);
@@ -582,8 +591,9 @@ mod tests {
582591
},
583592
]);
584593

585-
let (rb, _) =
586-
into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
594+
let (rb, _) = json::Event { json }
595+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
596+
.unwrap();
587597

588598
assert_eq!(rb.num_rows(), 3);
589599
assert_eq!(rb.num_columns(), 4);
@@ -630,7 +640,9 @@ mod tests {
630640
.into_iter(),
631641
);
632642

633-
let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap();
643+
let (rb, _) = json::Event { json }
644+
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
645+
.unwrap();
634646

635647
assert_eq!(rb.num_rows(), 3);
636648
assert_eq!(rb.num_columns(), 4);
@@ -677,7 +689,9 @@ mod tests {
677689
.into_iter(),
678690
);
679691

680-
assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err());
692+
assert!(json::Event { json }
693+
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
694+
.is_err());
681695
}
682696

683697
#[test]
@@ -715,13 +729,10 @@ mod tests {
715729
)
716730
.unwrap();
717731

718-
let (rb, _) = into_event_batch(
719-
flattened_json,
720-
HashMap::default(),
721-
false,
722-
None,
723-
SchemaVersion::V0,
724-
)
732+
let (rb, _) = json::Event {
733+
json: flattened_json,
734+
}
735+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
725736
.unwrap();
726737
assert_eq!(rb.num_rows(), 4);
727738
assert_eq!(rb.num_columns(), 5);
@@ -803,13 +814,10 @@ mod tests {
803814
)
804815
.unwrap();
805816

806-
let (rb, _) = into_event_batch(
807-
flattened_json,
808-
HashMap::default(),
809-
false,
810-
None,
811-
SchemaVersion::V1,
812-
)
817+
let (rb, _) = json::Event {
818+
json: flattened_json,
819+
}
820+
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1)
813821
.unwrap();
814822

815823
assert_eq!(rb.num_rows(), 4);

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
*
1717
*/
1818

19-
use arrow_schema::Field;
2019
use chrono::{DateTime, NaiveDateTime, Utc};
2120
use itertools::Itertools;
2221
use opentelemetry_proto::tonic::{
2322
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
2423
};
2524
use serde_json::Value;
26-
use std::{collections::HashMap, sync::Arc};
25+
use std::collections::HashMap;
2726

2827
use crate::{
2928
event::{
@@ -34,7 +33,6 @@ use crate::{
3433
ingest::PostError,
3534
kinesis::{flatten_kinesis_logs, Message},
3635
},
37-
metadata::SchemaVersion,
3836
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
3937
parseable::{StreamNotFound, PARSEABLE},
4038
storage::StreamType,
@@ -117,16 +115,16 @@ async fn push_logs(
117115
)?)?]
118116
};
119117

120-
for value in data {
121-
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
118+
for json in data {
119+
let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
122120
let parsed_timestamp = match time_partition.as_ref() {
123-
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
121+
Some(time_partition) => get_parsed_timestamp(&json, time_partition)?,
124122
_ => Utc::now().naive_utc(),
125123
};
126124
let custom_partition_values = match custom_partition.as_ref() {
127125
Some(custom_partition) => {
128126
let custom_partitions = custom_partition.split(',').collect_vec();
129-
get_custom_partition_values(&value, &custom_partitions)
127+
get_custom_partition_values(&json, &custom_partitions)
130128
}
131129
None => HashMap::new(),
132130
};
@@ -141,9 +139,8 @@ async fn push_logs(
141139
.expect(LOCK_EXPECT)
142140
.schema
143141
.clone();
144-
let (rb, is_first_event) = into_event_batch(
145-
value,
146-
schema,
142+
let (rb, is_first_event) = json::Event { json }.into_recordbatch(
143+
&schema,
147144
static_schema_flag,
148145
time_partition.as_ref(),
149146
schema_version,
@@ -165,22 +162,6 @@ async fn push_logs(
165162
Ok(())
166163
}
167164

168-
pub fn into_event_batch(
169-
data: Value,
170-
schema: HashMap<String, Arc<Field>>,
171-
static_schema_flag: bool,
172-
time_partition: Option<&String>,
173-
schema_version: SchemaVersion,
174-
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
175-
let (rb, is_first) = json::Event { data }.into_recordbatch(
176-
&schema,
177-
static_schema_flag,
178-
time_partition,
179-
schema_version,
180-
)?;
181-
Ok((rb, is_first))
182-
}
183-
184165
pub fn get_custom_partition_values(
185166
json: &Value,
186167
custom_partition_list: &[&str],

0 commit comments

Comments
 (0)