Skip to content

Commit 4c1f6d8

Browse files
author
Devdutt Shenoi
committed
fix: concat to not lose data
1 parent 975b1f6 commit 4c1f6d8

File tree

2 files changed

+10
-9
lines changed

2 files changed

+10
-9
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ build = "build.rs"
99

1010
[dependencies]
1111
# Arrow and DataFusion ecosystem
12-
arrow-array = { version = "53.0.0" }
12+
arrow = "53.0.0"
13+
arrow-array = "53.0.0"
1314
arrow-flight = { version = "53.0.0", features = ["tls"] }
1415
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
1516
arrow-json = "53.0.0"

src/event/format/json.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#![allow(deprecated)]
2121

2222
use anyhow::anyhow;
23+
use arrow::compute::concat_batches;
2324
use arrow_array::RecordBatch;
2425
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
2526
use arrow_schema::{DataType, Field, Fields, Schema};
@@ -282,14 +283,13 @@ impl EventFormat for Event {
282283
key.push_str(&format!("&{k}={v}"));
283284
}
284285

285-
partitions.insert(
286-
key,
287-
PartitionEvent {
288-
rb,
289-
parsed_timestamp,
290-
custom_partition_values,
291-
},
292-
);
286+
let entry = partitions.entry(key).or_insert(PartitionEvent {
287+
rb: RecordBatch::new_empty(schema.clone()),
288+
parsed_timestamp,
289+
custom_partition_values,
290+
});
291+
292+
entry.rb = concat_batches(&schema, [&entry.rb, &rb])?;
293293
}
294294

295295
Ok(super::Event {

0 commit comments

Comments
 (0)