Skip to content

Commit 5d87407

Browse files
author
Devdutt Shenoi
committed
fix: rb per object
1 parent 1cd8789 commit 5d87407

File tree

3 files changed

+81
-88
lines changed

3 files changed

+81
-88
lines changed

src/event/format/json.rs

Lines changed: 47 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl Event {
134134
}
135135

136136
impl EventFormat for Event {
137-
type Data = Vec<Json>;
137+
type Data = Json;
138138

139139
// convert the incoming json to a vector of json values
140140
// also extract the arrow schema, tags and metadata from the incoming json
@@ -144,7 +144,7 @@ impl EventFormat for Event {
144144
time_partition_limit: Option<NonZeroU32>,
145145
custom_partitions: Option<&String>,
146146
schema_version: SchemaVersion,
147-
) -> anyhow::Result<Self::Data> {
147+
) -> anyhow::Result<Vec<Self::Data>> {
148148
self.flatten_logs(
149149
time_partition,
150150
time_partition_limit,
@@ -161,17 +161,18 @@ impl EventFormat for Event {
161161
schema_version: SchemaVersion,
162162
) -> anyhow::Result<(super::EventSchema, bool)> {
163163
// collect all the keys from all the json objects in the request body
164-
let fields = collect_keys(data.iter());
164+
let fields = collect_keys(data);
165165

166166
let mut is_first = false;
167167
let schema = if let Some(schema) = derive_arrow_schema(stored_schema, fields) {
168168
schema
169169
} else {
170170
// TODO:
171-
let mut infer_schema = infer_json_schema_from_iterator(
172-
data.iter().map(|obj| Ok(Value::Object(obj.clone()))),
173-
)
174-
.map_err(|err| anyhow!("Could not infer schema for this event due to err {:?}", err))?;
171+
let mut infer_schema =
172+
infer_json_schema_from_iterator([Ok(Value::Object(data.clone()))].into_iter())
173+
.map_err(|err| {
174+
anyhow!("Could not infer schema for this event due to err {:?}", err)
175+
})?;
175176
let new_infer_schema = super::update_field_type_in_schema(
176177
Arc::new(infer_schema),
177178
Some(stored_schema),
@@ -200,10 +201,7 @@ impl EventFormat for Event {
200201
.collect()
201202
};
202203

203-
if data
204-
.iter()
205-
.any(|value| fields_mismatch(&schema, value, schema_version))
206-
{
204+
if fields_mismatch(&schema, data, schema_version) {
207205
return Err(anyhow!(
208206
"Could not process this event due to mismatch in datatype"
209207
));
@@ -215,14 +213,14 @@ impl EventFormat for Event {
215213
}
216214

217215
// Convert the Data type (defined above) to arrow record batch
218-
fn decode(data: Self::Data, schema: Arc<Schema>) -> anyhow::Result<RecordBatch> {
216+
fn decode(data: &[Self::Data], schema: Arc<Schema>) -> anyhow::Result<RecordBatch> {
219217
let array_capacity = round_upto_multiple_of_64(data.len());
220218
let mut reader = ReaderBuilder::new(schema)
221219
.with_batch_size(array_capacity)
222220
.with_coerce_primitive(false)
223221
.build_decoder()?;
224222

225-
reader.serialize(&data)?;
223+
reader.serialize(data)?;
226224
match reader.flush() {
227225
Ok(Some(recordbatch)) => Ok(recordbatch),
228226
Err(err) => Err(anyhow!("Failed to create recordbatch due to {:?}", err)),
@@ -248,16 +246,18 @@ impl EventFormat for Event {
248246
custom_partitions.as_ref(),
249247
schema_version,
250248
)?;
251-
let (schema, is_first_event) = Self::infer_schema(
252-
&data,
253-
&stored_schema,
254-
time_partition.as_ref(),
255-
static_schema_flag,
256-
schema_version,
257-
)?;
258249

250+
let mut is_first_event = false;
259251
let mut partitions = HashMap::new();
260252
for json in data {
253+
let (schema, is_first) = Self::infer_schema(
254+
&json,
255+
&stored_schema,
256+
time_partition.as_ref(),
257+
static_schema_flag,
258+
schema_version,
259+
)?;
260+
is_first_event = is_first_event || is_first;
261261
let custom_partition_values = match custom_partitions.as_ref() {
262262
Some(custom_partitions) => {
263263
let custom_partitions = custom_partitions.split(',').collect_vec();
@@ -273,7 +273,7 @@ impl EventFormat for Event {
273273

274274
let batch = Self::into_recordbatch(
275275
p_timestamp,
276-
vec![json],
276+
&[json],
277277
&schema,
278278
time_partition.as_ref(),
279279
schema_version,
@@ -368,15 +368,8 @@ fn derive_arrow_schema(
368368

369369
// Returns a list of keys that are present in the given iterable of JSON objects
370370
// Returns None if even one of the value is not an Object
371-
fn collect_keys<'a>(objects: impl Iterator<Item = &'a Json>) -> HashSet<&'a str> {
372-
let mut keys = HashSet::new();
373-
for object in objects {
374-
for key in object.keys() {
375-
keys.insert(key.as_str());
376-
}
377-
}
378-
379-
keys
371+
fn collect_keys(object: &Json) -> HashSet<&str> {
372+
object.keys().map(|k| k.as_str()).collect()
380373
}
381374

382375
// Returns true when the field doesn't exist in schema or has an invalid type
@@ -515,9 +508,9 @@ mod tests {
515508
.to_data(None, None, None, SchemaVersion::V0)
516509
.unwrap();
517510
let (schema, _) =
518-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
511+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
519512
let rb =
520-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
513+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
521514

522515
assert_eq!(rb.num_rows(), 1);
523516
assert_eq!(rb.num_columns(), 4);
@@ -548,9 +541,9 @@ mod tests {
548541
.to_data(None, None, None, SchemaVersion::V0)
549542
.unwrap();
550543
let (schema, _) =
551-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
544+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
552545
let rb =
553-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
546+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
554547

555548
assert_eq!(rb.num_rows(), 1);
556549
assert_eq!(rb.num_columns(), 3);
@@ -583,9 +576,9 @@ mod tests {
583576
.to_data(None, None, None, SchemaVersion::V0)
584577
.unwrap();
585578
let (schema, _) =
586-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
579+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
587580
let rb =
588-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
581+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
589582

590583
assert_eq!(rb.num_rows(), 1);
591584
assert_eq!(rb.num_columns(), 3);
@@ -619,7 +612,9 @@ mod tests {
619612
.to_data(None, None, None, SchemaVersion::V0)
620613
.unwrap();
621614

622-
assert!(Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).is_err());
615+
assert!(
616+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).is_err()
617+
);
623618
}
624619

625620
#[test]
@@ -639,9 +634,9 @@ mod tests {
639634
.to_data(None, None, None, SchemaVersion::V0)
640635
.unwrap();
641636
let (schema, _) =
642-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
637+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
643638
let rb =
644-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
639+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
645640

646641
assert_eq!(rb.num_rows(), 1);
647642
assert_eq!(rb.num_columns(), 1);
@@ -670,9 +665,9 @@ mod tests {
670665
.to_data(None, None, None, SchemaVersion::V0)
671666
.unwrap();
672667
let (schema, _) =
673-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
668+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
674669
let rb =
675-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
670+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
676671

677672
assert_eq!(rb.num_rows(), 3);
678673
assert_eq!(rb.num_columns(), 4);
@@ -723,9 +718,9 @@ mod tests {
723718
.to_data(None, None, None, SchemaVersion::V0)
724719
.unwrap();
725720
let (schema, _) =
726-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
721+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
727722
let rb =
728-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
723+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
729724

730725
assert_eq!(rb.num_rows(), 3);
731726
assert_eq!(rb.num_columns(), 4);
@@ -775,9 +770,9 @@ mod tests {
775770
.to_data(None, None, None, SchemaVersion::V0)
776771
.unwrap();
777772
let (schema, _) =
778-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
773+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
779774
let rb =
780-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
775+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
781776

782777
assert_eq!(rb.num_rows(), 3);
783778
assert_eq!(rb.num_columns(), 4);
@@ -828,7 +823,9 @@ mod tests {
828823
.to_data(None, None, None, SchemaVersion::V0)
829824
.unwrap();
830825

831-
assert!(Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).is_err());
826+
assert!(
827+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).is_err()
828+
);
832829
}
833830

834831
#[test]
@@ -860,9 +857,9 @@ mod tests {
860857
.to_data(None, None, None, SchemaVersion::V0)
861858
.unwrap();
862859
let (schema, _) =
863-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V0).unwrap();
860+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap();
864861
let rb =
865-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
862+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap();
866863

867864
assert_eq!(rb.num_rows(), 4);
868865
assert_eq!(rb.num_columns(), 5);
@@ -938,9 +935,9 @@ mod tests {
938935
.to_data(None, None, None, SchemaVersion::V1)
939936
.unwrap();
940937
let (schema, _) =
941-
Event::infer_schema(&data, &store_schema, None, false, SchemaVersion::V1).unwrap();
938+
Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V1).unwrap();
942939
let rb =
943-
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V1).unwrap();
940+
Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V1).unwrap();
944941

945942
assert_eq!(rb.num_rows(), 4);
946943
assert_eq!(rb.num_columns(), 5);

src/event/format/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub trait EventFormat: Sized {
109109
time_partition_limit: Option<NonZeroU32>,
110110
custom_partitions: Option<&String>,
111111
schema_version: SchemaVersion,
112-
) -> anyhow::Result<Self::Data>;
112+
) -> anyhow::Result<Vec<Self::Data>>;
113113

114114
fn infer_schema(
115115
data: &Self::Data,
@@ -119,7 +119,7 @@ pub trait EventFormat: Sized {
119119
schema_version: SchemaVersion,
120120
) -> anyhow::Result<(EventSchema, IsFirstEvent)>;
121121

122-
fn decode(data: Self::Data, schema: Arc<Schema>) -> anyhow::Result<RecordBatch>;
122+
fn decode(data: &[Self::Data], schema: Arc<Schema>) -> anyhow::Result<RecordBatch>;
123123

124124
/// Updates inferred schema with `p_timestamp` field and ensures it adheres to expectations
125125
fn prepare_and_validate_schema(
@@ -156,7 +156,7 @@ pub trait EventFormat: Sized {
156156

157157
fn into_recordbatch(
158158
p_timestamp: DateTime<Utc>,
159-
data: Self::Data,
159+
data: &[Self::Data],
160160
schema: &EventSchema,
161161
time_partition: Option<&String>,
162162
schema_version: SchemaVersion,

src/event/mod.rs

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,27 @@
1919

2020
pub mod format;
2121

22-
use arrow::compute::concat_batches;
2322
use arrow_array::RecordBatch;
2423
use arrow_schema::{Field, Schema};
2524
use itertools::Itertools;
2625
use std::sync::Arc;
2726

2827
use self::error::EventError;
29-
use crate::{
30-
metadata::update_stats,
31-
parseable::{StagingError, Stream},
32-
storage::StreamType,
33-
};
28+
use crate::{metadata::update_stats, parseable::Stream, storage::StreamType};
3429
use chrono::NaiveDateTime;
3530
use std::collections::HashMap;
3631

3732
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3833

34+
#[derive(Debug)]
3935
pub struct PartitionEvent {
4036
pub rbs: Vec<RecordBatch>,
4137
pub schema: Arc<Schema>,
4238
pub parsed_timestamp: NaiveDateTime,
4339
pub custom_partition_values: HashMap<String, String>,
4440
}
4541

42+
#[derive(Debug)]
4643
pub struct Event {
4744
pub origin_format: &'static str,
4845
pub origin_size: usize,
@@ -56,44 +53,43 @@ pub struct Event {
5653
impl Event {
5754
pub fn process(self, stream: &Stream) -> Result<(), EventError> {
5855
for (key, partition) in self.partitions {
59-
let rb =
60-
concat_batches(&partition.schema, &partition.rbs).map_err(StagingError::Arrow)?;
6156
if self.is_first_event {
6257
stream.commit_schema(partition.schema.as_ref().clone())?;
6358
}
64-
65-
stream.push(
66-
&key,
67-
&rb,
68-
partition.parsed_timestamp,
69-
&partition.custom_partition_values,
70-
self.stream_type,
71-
)?;
72-
73-
update_stats(
74-
&stream.stream_name,
75-
self.origin_format,
76-
self.origin_size,
77-
rb.num_rows(),
78-
partition.parsed_timestamp.date(),
79-
);
80-
81-
crate::livetail::LIVETAIL.process(&stream.stream_name, &rb);
59+
for rb in partition.rbs {
60+
stream.push(
61+
&key,
62+
&rb,
63+
partition.parsed_timestamp,
64+
&partition.custom_partition_values,
65+
self.stream_type,
66+
)?;
67+
68+
update_stats(
69+
&stream.stream_name,
70+
self.origin_format,
71+
self.origin_size,
72+
rb.num_rows(),
73+
partition.parsed_timestamp.date(),
74+
);
75+
76+
crate::livetail::LIVETAIL.process(&stream.stream_name, &rb);
77+
}
8278
}
8379
Ok(())
8480
}
8581

8682
pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> {
8783
for (key, partition) in &self.partitions {
88-
let rb =
89-
concat_batches(&partition.schema, &partition.rbs).map_err(StagingError::Arrow)?;
90-
stream.push(
91-
key,
92-
&rb,
93-
partition.parsed_timestamp,
94-
&partition.custom_partition_values,
95-
self.stream_type,
96-
)?;
84+
for rb in &partition.rbs {
85+
stream.push(
86+
key,
87+
rb,
88+
partition.parsed_timestamp,
89+
&partition.custom_partition_values,
90+
self.stream_type,
91+
)?;
92+
}
9793
}
9894

9995
Ok(())

0 commit comments

Comments
 (0)