Skip to content

Commit e9d4ae1

Browse files
update logic
1 parent 6dc39ab commit e9d4ae1

File tree

2 files changed

+39
-30
lines changed

2 files changed

+39
-30
lines changed

src/event/format/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ pub trait EventFormat: Sized {
148148
p_custom_fields: &HashMap<String, String>,
149149
) -> Result<(RecordBatch, bool), AnyError> {
150150
let p_timestamp = self.get_p_timestamp();
151-
let (data, mut schema, is_first) = self.to_data(
151+
let (data, schema, is_first) = self.to_data(
152152
storage_schema,
153153
time_partition,
154154
schema_version,
@@ -172,7 +172,7 @@ pub trait EventFormat: Sized {
172172

173173
let rb = Self::decode(data, new_schema.clone())?;
174174

175-
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields);
175+
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?;
176176

177177
Ok((rb, is_first))
178178
}

src/utils/arrow/mod.rs

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use std::{collections::HashMap, sync::Arc};
4545
use arrow_array::{
4646
Array, ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray, UInt64Array,
4747
};
48-
use arrow_schema::{DataType, Field, Schema, TimeUnit};
48+
use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit};
4949
use arrow_select::take::take;
5050
use chrono::{DateTime, Utc};
5151
use itertools::Itertools;
@@ -147,41 +147,50 @@ pub fn add_parseable_fields(
147147
rb: RecordBatch,
148148
p_timestamp: DateTime<Utc>,
149149
p_custom_fields: &HashMap<String, String>,
150-
) -> RecordBatch {
150+
) -> Result<RecordBatch, ArrowError> {
151+
// Return Result for proper error handling
152+
153+
// Add custom fields in sorted order
154+
let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect();
155+
sorted_keys.sort();
156+
151157
let schema = rb.schema();
152158
let row_count = rb.num_rows();
153159

154-
let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
155-
let mut columns: Vec<ArrayRef> = rb.columns().to_vec();
156-
157-
// Create and insert the p_timestamp field and array at index 0
158-
let p_timestamp_array = Arc::new(TimestampMillisecondArray::from_iter_values(
159-
std::iter::repeat(p_timestamp.timestamp_millis()).take(row_count),
160-
));
161-
let p_timestamp_field = Field::new(
162-
DEFAULT_TIMESTAMP_KEY.to_string(),
163-
DataType::Timestamp(TimeUnit::Millisecond, None),
164-
false,
160+
let mut fields = schema
161+
.fields()
162+
.iter()
163+
.map(|f| f.as_ref().clone())
164+
.collect_vec();
165+
fields.insert(
166+
0,
167+
Field::new(
168+
DEFAULT_TIMESTAMP_KEY,
169+
DataType::Timestamp(TimeUnit::Millisecond, None),
170+
true,
171+
),
172+
);
173+
fields.extend(
174+
sorted_keys
175+
.iter()
176+
.map(|k| Field::new(*k, DataType::Utf8, true)),
165177
);
166-
fields.insert(0, p_timestamp_field);
167-
columns.insert(0, p_timestamp_array);
168178

169-
// Sort p_custom_fields by key and insert custom fields at the beginning, after the p_timestamp field
170-
let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect();
171-
sorted_keys.sort();
172-
for key in sorted_keys.iter().rev() {
173-
let value = p_custom_fields.get(*key).unwrap();
174-
let string_array: ArrayRef = Arc::new(StringArray::from_iter_values(
179+
let mut columns = rb.columns().iter().map(Arc::clone).collect_vec();
180+
columns.insert(
181+
0,
182+
Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef,
183+
);
184+
columns.extend(sorted_keys.iter().map(|k| {
185+
let value = p_custom_fields.get(*k).unwrap();
186+
Arc::new(StringArray::from_iter_values(
175187
std::iter::repeat(value).take(row_count),
176-
));
177-
columns.insert(1, string_array);
178-
179-
let new_field = Field::new((*key).clone(), DataType::Utf8, true);
180-
fields.insert(1, new_field);
181-
}
188+
)) as ArrayRef
189+
}));
182190

191+
// Create the new schema and batch
183192
let new_schema = Arc::new(Schema::new(fields));
184-
RecordBatch::try_new(new_schema, columns).unwrap()
193+
RecordBatch::try_new(new_schema, columns)
185194
}
186195

187196
pub fn reverse(rb: &RecordBatch) -> RecordBatch {

0 commit comments

Comments
 (0)