Skip to content

Commit 5f746eb

Browse files
update logic
1 parent a336970 commit 5f746eb

File tree

2 files changed

+38
-29
lines changed

2 files changed

+38
-29
lines changed

src/event/format/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ pub trait EventFormat: Sized {
138138

139139
let rb = Self::decode(data, new_schema.clone())?;
140140

141-
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields);
141+
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?;
142142

143143
Ok((rb, is_first))
144144
}

src/utils/arrow/mod.rs

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use std::{collections::HashMap, sync::Arc};
4545
use arrow_array::{
4646
Array, ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray, UInt64Array,
4747
};
48-
use arrow_schema::{DataType, Field, Schema, TimeUnit};
48+
use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit};
4949
use arrow_select::take::take;
5050
use chrono::{DateTime, Utc};
5151
use itertools::Itertools;
@@ -145,41 +145,50 @@ pub fn add_parseable_fields(
145145
rb: RecordBatch,
146146
p_timestamp: DateTime<Utc>,
147147
p_custom_fields: &HashMap<String, String>,
148-
) -> RecordBatch {
148+
) -> Result<RecordBatch, ArrowError> {
149+
// Return Result for proper error handling
150+
151+
// Add custom fields in sorted order
152+
let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect();
153+
sorted_keys.sort();
154+
149155
let schema = rb.schema();
150156
let row_count = rb.num_rows();
151157

152-
let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
153-
let mut columns: Vec<ArrayRef> = rb.columns().to_vec();
154-
155-
// Create and insert the p_timestamp field and array at index 0
156-
let p_timestamp_array = Arc::new(TimestampMillisecondArray::from_iter_values(
157-
std::iter::repeat(p_timestamp.timestamp_millis()).take(row_count),
158-
));
159-
let p_timestamp_field = Field::new(
160-
DEFAULT_TIMESTAMP_KEY.to_string(),
161-
DataType::Timestamp(TimeUnit::Millisecond, None),
162-
false,
158+
let mut fields = schema
159+
.fields()
160+
.iter()
161+
.map(|f| f.as_ref().clone())
162+
.collect_vec();
163+
fields.insert(
164+
0,
165+
Field::new(
166+
DEFAULT_TIMESTAMP_KEY,
167+
DataType::Timestamp(TimeUnit::Millisecond, None),
168+
true,
169+
),
170+
);
171+
fields.extend(
172+
sorted_keys
173+
.iter()
174+
.map(|k| Field::new(*k, DataType::Utf8, true)),
163175
);
164-
fields.insert(0, p_timestamp_field);
165-
columns.insert(0, p_timestamp_array);
166176

167-
// Sort p_custom_fields by key and insert custom fields at the beginning, after the p_timestamp field
168-
let mut sorted_keys: Vec<&String> = p_custom_fields.keys().collect();
169-
sorted_keys.sort();
170-
for key in sorted_keys.iter().rev() {
171-
let value = p_custom_fields.get(*key).unwrap();
172-
let string_array: ArrayRef = Arc::new(StringArray::from_iter_values(
177+
let mut columns = rb.columns().iter().map(Arc::clone).collect_vec();
178+
columns.insert(
179+
0,
180+
Arc::new(get_timestamp_array(p_timestamp, row_count)) as ArrayRef,
181+
);
182+
columns.extend(sorted_keys.iter().map(|k| {
183+
let value = p_custom_fields.get(*k).unwrap();
184+
Arc::new(StringArray::from_iter_values(
173185
std::iter::repeat(value).take(row_count),
174-
));
175-
columns.insert(1, string_array);
176-
177-
let new_field = Field::new((*key).clone(), DataType::Utf8, true);
178-
fields.insert(1, new_field);
179-
}
186+
)) as ArrayRef
187+
}));
180188

189+
// Create the new schema and batch
181190
let new_schema = Arc::new(Schema::new(fields));
182-
RecordBatch::try_new(new_schema, columns).unwrap()
191+
RecordBatch::try_new(new_schema, columns)
183192
}
184193

185194
pub fn reverse(rb: &RecordBatch) -> RecordBatch {

0 commit comments

Comments
 (0)