Skip to content

Commit 1386d3b

Browse files
author
Devdutt Shenoi
committed
fix: flattening
1 parent 354061a commit 1386d3b

File tree

1 file changed

+14
-28
lines changed

1 file changed

+14
-28
lines changed

src/event/format/json.rs

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,7 @@ use crate::{
4343
metadata::SchemaVersion,
4444
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
4545
storage::StreamType,
46-
utils::{
47-
arrow::get_field,
48-
json::{convert_array_to_object, flatten::convert_to_array},
49-
},
46+
utils::{arrow::get_field, json::convert_array_to_object},
5047
};
5148

5249
pub struct Event {
@@ -70,7 +67,7 @@ pub fn flatten_logs(
7067
custom_partitions: Option<&String>,
7168
schema_version: SchemaVersion,
7269
log_source: &LogSource,
73-
) -> Result<Vec<Value>, anyhow::Error> {
70+
) -> anyhow::Result<Vec<Value>> {
7471
let data = match log_source {
7572
LogSource::Kinesis => {
7673
//custom flattening required for Amazon Kinesis
@@ -97,25 +94,14 @@ pub fn flatten_logs(
9794

9895
let mut logs = vec![];
9996
for json in data {
100-
if time_partition.is_some() || custom_partitions.is_some() {
101-
logs.append(&mut convert_array_to_object(
102-
json,
103-
time_partition,
104-
time_partition_limit,
105-
custom_partitions,
106-
schema_version,
107-
log_source,
108-
)?)
109-
} else {
110-
logs.push(convert_to_array(convert_array_to_object(
111-
json,
112-
None,
113-
None,
114-
None,
115-
schema_version,
116-
log_source,
117-
)?)?)
118-
}
97+
logs.append(&mut convert_array_to_object(
98+
json,
99+
time_partition,
100+
time_partition_limit,
101+
custom_partitions,
102+
schema_version,
103+
log_source,
104+
)?)
119105
}
120106

121107
Ok(logs)
@@ -139,7 +125,7 @@ impl EventFormat for Event {
139125
custom_partitions: Option<&String>,
140126
schema_version: SchemaVersion,
141127
log_source: &LogSource,
142-
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
128+
) -> anyhow::Result<(Self::Data, Vec<Arc<Field>>, bool)> {
143129
let flattened = flatten_logs(
144130
self.json,
145131
time_partition,
@@ -202,7 +188,7 @@ impl EventFormat for Event {
202188
}
203189

204190
// Convert the Data type (defined above) to arrow record batch
205-
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, anyhow::Error> {
191+
fn decode(data: Self::Data, schema: Arc<Schema>) -> anyhow::Result<RecordBatch> {
206192
let array_capacity = round_upto_multiple_of_64(data.len());
207193
let mut reader = ReaderBuilder::new(schema)
208194
.with_batch_size(array_capacity)
@@ -230,7 +216,7 @@ impl EventFormat for Event {
230216
schema_version: SchemaVersion,
231217
log_source: &LogSource,
232218
stream_type: StreamType,
233-
) -> Result<super::Event, anyhow::Error> {
219+
) -> anyhow::Result<super::Event> {
234220
let custom_partition_values = match custom_partitions.as_ref() {
235221
Some(custom_partitions) => {
236222
let custom_partitions = custom_partitions.split(',').collect_vec();
@@ -295,7 +281,7 @@ pub fn extract_custom_partition_values(
295281
fn extract_and_parse_time(
296282
json: &Value,
297283
time_partition: &str,
298-
) -> Result<NaiveDateTime, anyhow::Error> {
284+
) -> anyhow::Result<NaiveDateTime> {
299285
let current_time = json
300286
.get(time_partition)
301287
.ok_or_else(|| anyhow!("Missing field for time partition in json: {time_partition}"))?;

0 commit comments

Comments
 (0)