diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 830409d3d..bc0c0218c 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -171,7 +171,6 @@ pub trait EventFormat: Sized { update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); let rb = Self::decode(data, new_schema.clone())?; - let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?; Ok((rb, is_first)) diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index 659c2bf88..cb9091450 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -17,37 +17,12 @@ * */ -//! example function for concat recordbatch(may not work) -//! ```rust -//! # use arrow::record_batch::RecordBatch; -//! # use arrow::error::Result; -//! -//! fn concat_batches(batch1: RecordBatch, batch2: RecordBatch) -> Result { -//! let schema = batch1.schema(); -//! let columns = schema -//! .fields() -//! .iter() -//! .enumerate() -//! .map(|(i, _)| -> Result<_> { -//! let array1 = batch1.column(i); -//! let array2 = batch2.column(i); -//! let array = arrow::compute::concat(&[array1.as_ref(), array2.as_ref()])?; -//! Ok(array) -//! }) -//! .collect::>>()?; -//! -//! RecordBatch::try_new(schema.clone(), columns) -//! } -//! ``` - use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use arrow_array::{ - Array, ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray, UInt64Array, -}; +use arrow_array::{ArrayRef, RecordBatch, StringArray, TimestampMillisecondArray, UInt64Array}; use arrow_schema::{ArrowError, DataType, Field, Schema, TimeUnit}; use arrow_select::take::take; use chrono::{DateTime, Utc}; @@ -62,31 +37,6 @@ use serde_json::{Map, Value}; use crate::event::DEFAULT_TIMESTAMP_KEY; -/// Replaces columns in a record batch with new arrays. -/// -/// # Arguments -/// -/// * `schema` - The schema of the record batch. -/// * `batch` - The record batch to modify. -/// * `indexes` - The indexes of the columns to replace. -/// * `arrays` - The new arrays to replace the columns with. -/// -/// # Returns -/// -/// The modified record batch with the columns replaced. -pub fn replace_columns( - schema: Arc, - batch: &RecordBatch, - indexes: &[usize], - arrays: &[Arc], -) -> RecordBatch { - let mut batch_arrays = batch.columns().iter().map(Arc::clone).collect_vec(); - for (&index, arr) in indexes.iter().zip(arrays.iter()) { - batch_arrays[index] = Arc::clone(arr); - } - RecordBatch::try_new(schema, batch_arrays).unwrap() -} - /// Converts a slice of record batches to JSON. /// /// # Arguments @@ -213,40 +163,11 @@ pub fn reverse(rb: &RecordBatch) -> RecordBatch { mod tests { use std::sync::Arc; - use arrow_array::{Array, Int32Array, RecordBatch}; - use arrow_schema::{DataType, Field, Schema}; + use arrow_array::RecordBatch; + use arrow_schema::Schema; use super::*; - #[test] - fn check_replace() { - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int32, false), - Field::new("c", DataType::Int32, false), - ]); - - let schema_ref = Arc::new(schema); - - let rb = RecordBatch::try_new( - schema_ref.clone(), - vec![ - Arc::new(Int32Array::from_value(0, 3)), - Arc::new(Int32Array::from_value(0, 3)), - Arc::new(Int32Array::from_value(0, 3)), - ], - ) - .unwrap(); - - let arr: Arc = Arc::new(Int32Array::from_value(0, 3)); - - let new_rb = replace_columns(schema_ref.clone(), &rb, &[2], &[arr]); - - assert_eq!(new_rb.schema(), schema_ref); - assert_eq!(new_rb.num_columns(), 3); - assert_eq!(new_rb.num_rows(), 3) - } - #[test] fn check_empty_json_to_record_batches() { let r = RecordBatch::new_empty(Arc::new(Schema::empty()));