From 42b1e461f0d85d7b6313aadfb180406ffee18c28 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 19 Mar 2025 00:20:11 +0530 Subject: [PATCH 1/2] refactor: `replace_columns` https://github.com/parseablehq/parseable/pull/1218/files#r1976356303 --- src/event/format/mod.rs | 3 +-- src/utils/arrow/mod.rs | 12 +++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 58c35fc79..afefd5b0c 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -183,8 +183,7 @@ pub trait EventFormat: Sized { rb = replace_columns( rb.schema(), &rb, - &[0], - &[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))], + &[(0, Arc::new(get_timestamp_array(p_timestamp, rb.num_rows())))], ); Ok((rb, is_first)) diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index e29762047..af940ee76 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -61,8 +61,7 @@ use serde_json::{Map, Value}; /// /// * `schema` - The schema of the record batch. /// * `batch` - The record batch to modify. -/// * `indexes` - The indexes of the columns to replace. -/// * `arrays` - The new arrays to replace the columns with. +/// * `indexed_arrays` - A list of indexes and arrays to replace the columns indexed with. /// /// # Returns /// @@ -70,12 +69,11 @@ use serde_json::{Map, Value}; pub fn replace_columns( schema: Arc, batch: &RecordBatch, - indexes: &[usize], - arrays: &[Arc], + indexed_arrays: &[(usize, Arc)], ) -> RecordBatch { let mut batch_arrays = batch.columns().iter().map(Arc::clone).collect_vec(); - for (&index, arr) in indexes.iter().zip(arrays.iter()) { - batch_arrays[index] = Arc::clone(arr); + for (index, arr) in indexed_arrays { + batch_arrays[*index] = Arc::clone(arr); } RecordBatch::try_new(schema, batch_arrays).unwrap() } @@ -180,7 +178,7 @@ mod tests { let arr: Arc = Arc::new(Int32Array::from_value(0, 3)); - let new_rb = replace_columns(schema_ref.clone(), &rb, &[2], &[arr]); + let new_rb = replace_columns(schema_ref.clone(), &rb, &[(2, arr)]); assert_eq!(new_rb.schema(), schema_ref); assert_eq!(new_rb.num_columns(), 3); From 367a5a67a782c0ebae1fb23babe914d852c4598a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 19 Mar 2025 00:42:07 +0530 Subject: [PATCH 2/2] test: `replace_columns` --- src/utils/arrow/mod.rs | 79 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index af940ee76..4c64775b7 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -151,7 +151,7 @@ pub fn reverse(rb: &RecordBatch) -> RecordBatch { mod tests { use std::sync::Arc; - use arrow_array::{Array, Int32Array, RecordBatch}; + use arrow_array::{Array, ArrayRef, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; use super::*; @@ -213,4 +213,81 @@ mod tests { assert_eq!(array.len(), 0); assert!(array.is_empty()); } + + #[test] + fn test_replace_single_column() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + + let columns: Vec = vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 9])), + ]; + + let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns.clone()).unwrap(); + + let new_b = Arc::new(Int32Array::from(vec![10, 11, 12])); + + let result = replace_columns(Arc::new(schema), &batch, &[(1, new_b.clone())]); + + assert_eq!(result.column(0).as_ref(), columns[0].as_ref()); + assert_eq!(result.column(1).as_ref(), new_b.as_ref()); + assert_eq!(result.column(2).as_ref(), columns[2].as_ref()); + } + + #[test] + fn replace_multiple_columns() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + + let columns: Vec = vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 9])), + ]; + + let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns.clone()).unwrap(); + + let new_a = Arc::new(Int32Array::from(vec![10, 11, 12])); + let new_c = Arc::new(Int32Array::from(vec![13, 14, 15])); + + let result = replace_columns( + Arc::new(schema), + &batch, + &[(0, new_a.clone()), (2, new_c.clone())], + ); + + assert_eq!(result.column(0).as_ref(), new_a.as_ref()); + assert_eq!(result.column(1).as_ref(), columns[1].as_ref()); + assert_eq!(result.column(2).as_ref(), new_c.as_ref()); + } + + #[test] + #[should_panic] + fn replace_column_with_different_length_array() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]); + + let columns: Vec = vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 9])), + ]; + + let batch = RecordBatch::try_new(Arc::new(schema.clone()), columns.clone()).unwrap(); + + let new_b = Arc::new(Int32Array::from(vec![10, 11])); // Different length + + replace_columns(Arc::new(schema), &batch, &[(1, new_b.clone())]); + } }