diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index b74754003..1db1ee0a1 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -27,10 +27,7 @@ use tracing::{debug, error}; use crate::{ connectors::common::processor::Processor, - event::{ - format::{json, EventFormat, LogSource}, - Event as ParseableEvent, - }, + event::format::{json, EventFormat, LogSource}, parseable::PARSEABLE, storage::StreamType, }; @@ -41,10 +38,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition pub struct ParseableSinkProcessor; impl ParseableSinkProcessor { - async fn build_event_from_chunk( - &self, - records: &[ConsumerRecord], - ) -> anyhow::Result { + async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<()> { let stream_name = records .first() .map(|r| r.topic.as_str()) @@ -55,11 +49,6 @@ impl ParseableSinkProcessor { .await?; let stream = PARSEABLE.get_stream(stream_name)?; - let schema = stream.get_schema_raw(); - let time_partition = stream.get_time_partition(); - let custom_partition = stream.get_custom_partition(); - let static_schema_flag = stream.get_static_schema_flag(); - let schema_version = stream.get_schema_version(); let mut json_vec = Vec::with_capacity(records.len()); let mut total_payload_size = 0u64; @@ -71,18 +60,11 @@ impl ParseableSinkProcessor { } } - let p_event = json::Event::new(Value::Array(json_vec)).into_event( - stream_name.to_string(), - total_payload_size, - &schema, - static_schema_flag, - custom_partition.as_ref(), - time_partition.as_ref(), - schema_version, - StreamType::UserDefined, - )?; - - Ok(p_event) + json::Event::new(Value::Array(json_vec)) + .to_event(&stream, total_payload_size)? + .process(&stream)?; + + Ok(()) } } @@ -92,7 +74,7 @@ impl Processor, ()> for ParseableSinkProcessor { let len = records.len(); debug!("Processing {len} records"); - self.build_event_from_chunk(&records).await?.process()?; + self.process_event_from_chunk(&records).await?; debug!("Processed {len} records"); Ok(()) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index c28b701de..2c9ce3d03 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -30,8 +30,8 @@ use serde_json::Value; use std::{collections::HashMap, sync::Arc}; use tracing::error; -use super::EventFormat; -use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field}; +use super::{EventFormat, EventSchema}; +use crate::{metadata::SchemaVersion, utils::arrow::get_field}; pub struct Event { pub json: Value, @@ -55,6 +55,27 @@ impl EventFormat for Event { self.p_timestamp } + fn get_partitions( + &self, + time_partition: Option<&String>, + custom_partitions: Option<&String>, + ) -> anyhow::Result<(NaiveDateTime, HashMap)> { + let custom_partition_values = match custom_partitions.as_ref() { + Some(custom_partition) => { + let custom_partitions = custom_partition.split(',').collect_vec(); + extract_custom_partition_values(&self.json, &custom_partitions) + } + None => HashMap::new(), + }; + + let parsed_timestamp = match time_partition { + Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?, + _ => self.p_timestamp.naive_utc(), + }; + + Ok((parsed_timestamp, custom_partition_values)) + } + // convert the incoming json to a vector of json values // also extract the arrow schema, tags and metadata from the incoming json fn to_data( @@ -62,9 +83,8 @@ impl EventFormat for Event { schema: &HashMap>, time_partition: Option<&String>, schema_version: SchemaVersion, - ) -> Result<(Self::Data, Vec>, bool), anyhow::Error> { + ) -> anyhow::Result<(Self::Data, EventSchema, bool)> { let stream_schema = schema; - // incoming event may be a single json or a json array // but Data (type defined above) is a vector of json values // hence we need to convert the incoming event to a vector of json values @@ -136,51 +156,6 @@ impl EventFormat for Event { Ok(None) => unreachable!("all records are added to one rb"), } } - - /// Converts a JSON event into a Parseable Event - fn into_event( - self, - stream_name: String, - origin_size: u64, - storage_schema: &HashMap>, - static_schema_flag: bool, - custom_partitions: Option<&String>, - time_partition: Option<&String>, - schema_version: SchemaVersion, - stream_type: StreamType, - ) -> Result { - let custom_partition_values = match custom_partitions.as_ref() { - Some(custom_partition) => { - let custom_partitions = custom_partition.split(',').collect_vec(); - extract_custom_partition_values(&self.json, &custom_partitions) - } - None => HashMap::new(), - }; - - let parsed_timestamp = match time_partition { - Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?, - _ => self.p_timestamp.naive_utc(), - }; - - let (rb, is_first_event) = self.into_recordbatch( - storage_schema, - static_schema_flag, - time_partition, - schema_version, - )?; - - Ok(super::Event { - rb, - stream_name, - origin_format: "json", - origin_size, - is_first_event, - parsed_timestamp, - time_partition: None, - custom_partition_values, - stream_type, - }) - } } /// Extracts custom partition values from provided JSON object diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index ce90cfc52..f97e78c65 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -23,20 +23,20 @@ use std::{ sync::Arc, }; -use anyhow::{anyhow, Error as AnyError}; +use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value; use crate::{ metadata::SchemaVersion, - storage::StreamType, + parseable::Stream, utils::arrow::{get_field, get_timestamp_array, replace_columns}, }; -use super::{Event, DEFAULT_TIMESTAMP_KEY}; +use super::DEFAULT_TIMESTAMP_KEY; pub mod json; @@ -97,28 +97,36 @@ impl Display for LogSource { pub trait EventFormat: Sized { type Data; + fn get_partitions( + &self, + time_partition: Option<&String>, + custom_partitions: Option<&String>, + ) -> anyhow::Result<(NaiveDateTime, HashMap)>; + fn to_data( self, schema: &HashMap>, time_partition: Option<&String>, schema_version: SchemaVersion, - ) -> Result<(Self::Data, EventSchema, bool), AnyError>; + ) -> anyhow::Result<(Self::Data, EventSchema, bool)>; - fn decode(data: Self::Data, schema: Arc) -> Result; + fn decode(data: Self::Data, schema: Arc) -> anyhow::Result; /// Returns the UTC time at ingestion fn get_p_timestamp(&self) -> DateTime; - fn into_recordbatch( - self, - storage_schema: &HashMap>, - static_schema_flag: bool, - time_partition: Option<&String>, - schema_version: SchemaVersion, - ) -> Result<(RecordBatch, bool), AnyError> { + fn to_event(self, stream: &Stream, origin_size: u64) -> anyhow::Result { + let storage_schema = stream.get_schema_raw(); + let static_schema_flag = stream.get_static_schema_flag(); + let time_partition = stream.get_time_partition(); + let custom_partition = stream.get_custom_partition(); + let schema_version = stream.get_schema_version(); + let stream_type = stream.get_stream_type(); let p_timestamp = self.get_p_timestamp(); - let (data, mut schema, is_first) = - self.to_data(storage_schema, time_partition, schema_version)?; + let (parsed_timestamp, custom_partition_values) = + self.get_partitions(time_partition.as_ref(), custom_partition.as_ref())?; + let (data, mut schema, is_first_event) = + self.to_data(&storage_schema, time_partition.as_ref(), schema_version)?; if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( @@ -139,11 +147,16 @@ pub trait EventFormat: Sized { // prepare the record batch and new fields to be added let mut new_schema = Arc::new(Schema::new(schema)); - if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { + if !Self::is_schema_matching(new_schema.clone(), &storage_schema, static_schema_flag) { return Err(anyhow!("Schema mismatch")); } - new_schema = - update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); + new_schema = update_field_type_in_schema( + new_schema, + None, + time_partition.as_ref(), + None, + schema_version, + ); let mut rb = Self::decode(data, new_schema.clone())?; rb = replace_columns( @@ -153,7 +166,16 @@ pub trait EventFormat: Sized { &[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))], ); - Ok((rb, is_first)) + Ok(super::Event { + rb, + origin_format: "json", + origin_size, + is_first_event, + time_partition, + parsed_timestamp, + custom_partition_values, + stream_type, + }) } fn is_schema_matching( @@ -177,19 +199,6 @@ pub trait EventFormat: Sized { } true } - - #[allow(clippy::too_many_arguments)] - fn into_event( - self, - stream_name: String, - origin_size: u64, - storage_schema: &HashMap>, - static_schema_flag: bool, - custom_partitions: Option<&String>, - time_partition: Option<&String>, - schema_version: SchemaVersion, - stream_type: StreamType, - ) -> Result; } pub fn get_existing_field_names( diff --git a/src/event/mod.rs b/src/event/mod.rs index 29a4a0899..f0889c9af 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use self::error::EventError; use crate::{ metadata::update_stats, - parseable::{StagingError, PARSEABLE}, + parseable::{StagingError, Stream, PARSEABLE}, storage::StreamType, LOCK_EXPECT, }; @@ -38,7 +38,6 @@ pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; #[derive(Clone)] pub struct Event { - pub stream_name: String, pub rb: RecordBatch, pub origin_format: &'static str, pub origin_size: u64, @@ -51,7 +50,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub fn process(self) -> Result<(), EventError> { + pub fn process(self, stream: &Stream) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); @@ -65,10 +64,10 @@ impl Event { } if self.is_first_event { - commit_schema(&self.stream_name, self.rb.schema())?; + commit_schema(&stream.stream_name, self.rb.schema())?; } - PARSEABLE.get_or_create_stream(&self.stream_name).push( + PARSEABLE.get_or_create_stream(&stream.stream_name).push( &key, &self.rb, self.parsed_timestamp, @@ -77,22 +76,22 @@ impl Event { )?; update_stats( - &self.stream_name, + &stream.stream_name, self.origin_format, self.origin_size, self.rb.num_rows(), self.parsed_timestamp.date(), ); - crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); + crate::livetail::LIVETAIL.process(&stream.stream_name, &self.rb); Ok(()) } - pub fn process_unchecked(&self) -> Result<(), EventError> { + pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> { let key = get_schema_key(&self.rb.schema().fields); - PARSEABLE.get_or_create_stream(&self.stream_name).push( + PARSEABLE.get_or_create_stream(&stream.stream_name).push( &key, &self.rb, self.parsed_timestamp, diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 0523e8757..4807e68ba 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -30,7 +30,6 @@ use crate::event; use crate::event::error::EventError; use crate::event::format::{self, EventFormat, LogSource}; use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; -use crate::metadata::SchemaVersion; use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; @@ -80,21 +79,12 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { let size: usize = body.len(); let json: Value = serde_json::from_slice(&body)?; - let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); + let stream = PARSEABLE.get_stream(&stream_name)?; // For internal streams, use old schema format::json::Event::new(json) - .into_event( - stream_name, - size as u64, - &schema, - false, - None, - None, - SchemaVersion::V0, - StreamType::Internal, - )? - .process()?; + .to_event(&stream, size as u64)? + .process(&stream)?; Ok(()) } @@ -240,9 +230,9 @@ pub async fn push_logs_unchecked( batches: RecordBatch, stream_name: &str, ) -> Result { + let stream = PARSEABLE.get_stream(stream_name)?; let unchecked_event = event::Event { rb: batches, - stream_name: stream_name.to_string(), origin_format: "json", origin_size: 0, parsed_timestamp: Utc::now().naive_utc(), @@ -251,7 +241,7 @@ pub async fn push_logs_unchecked( custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, }; - unchecked_event.process_unchecked()?; + unchecked_event.process_unchecked(&stream)?; Ok(unchecked_event) } @@ -333,501 +323,496 @@ impl actix_web::ResponseError for PostError { } } -#[cfg(test)] -mod tests { - - use arrow::datatypes::Int64Type; - use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; - use arrow_schema::{DataType, Field}; - use serde_json::json; - use std::{collections::HashMap, sync::Arc}; - - use crate::{ - event::format::{json, EventFormat}, - metadata::SchemaVersion, - utils::json::{convert_array_to_object, flatten::convert_to_array}, - }; - - trait TestExt { - fn as_int64_arr(&self) -> Option<&Int64Array>; - fn as_float64_arr(&self) -> Option<&Float64Array>; - fn as_utf8_arr(&self) -> Option<&StringArray>; - } - - impl TestExt for ArrayRef { - fn as_int64_arr(&self) -> Option<&Int64Array> { - self.as_any().downcast_ref() - } - - fn as_float64_arr(&self) -> Option<&Float64Array> { - self.as_any().downcast_ref() - } - - fn as_utf8_arr(&self) -> Option<&StringArray> { - self.as_any().downcast_ref() - } - } - - fn fields_to_map(iter: impl Iterator) -> HashMap> { - iter.map(|x| (x.name().clone(), Arc::new(x))).collect() - } - - #[test] - fn basic_object_into_rb() { - let json = json!({ - "c": 4.23, - "a": 1, - "b": "hello", - }); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from_iter_values(["hello"]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), - &Float64Array::from_iter([4.23]) - ); - } - - #[test] - fn basic_object_with_null_into_rb() { - let json = json!({ - "a": 1, - "b": "hello", - "c": null - }); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 3); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from_iter_values(["hello"]) - ); - } - - #[test] - fn basic_object_derive_schema_into_rb() { - let json = json!({ - "a": 1, - "b": "hello", - }); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 3); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from_iter_values(["hello"]) - ); - } - - #[test] - fn basic_object_schema_mismatch() { - let json = json!({ - "a": 1, - "b": 1, // type mismatch - }); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0,) - .is_err()); - } - - #[test] - fn empty_object() { - let json = json!({}); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 1); - } - - #[test] - fn non_object_arr_is_err() { - let json = json!([1]); - - assert!(convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V0, - &crate::event::format::LogSource::default() - ) - .is_err()) - } - - #[test] - fn array_into_recordbatch_inffered_schema() { - let json = json!([ - { - "b": "hello", - }, - { - "b": "hello", - "a": 1, - "c": 1 - }, - { - "a": 1, - "b": "hello", - "c": null - }, - ]); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 4); - - let schema = rb.schema(); - let fields = &schema.fields; - - assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true)); - assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true)); - assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); - - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), None]) - ); - } - - #[test] - fn arr_with_null_into_rb() { - let json = json!([ - { - "c": null, - "b": "hello", - "a": null - }, - { - "a": 1, - "c": 1.22, - "b": "hello" - }, - { - "b": "hello", - "a": 1, - "c": null - }, - ]); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, Some(1.22), None,]) - ); - } - - #[test] - fn arr_with_null_derive_schema_into_rb() { - let json = json!([ - { - "c": null, - "b": "hello", - "a": null - }, - { - "a": 1, - "c": 1.22, - "b": "hello" - }, - { - "b": "hello", - "a": 1, - "c": null - }, - ]); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, Some(1.22), None,]) - ); - } - - #[test] - fn arr_schema_mismatch() { - let json = json!([ - { - "a": null, - "b": "hello", - "c": 1.24 - }, - { - "a": 1, - "b": "hello", - "c": 1 - }, - { - "a": 1, - "b": "hello", - "c": null - }, - ]); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0,) - .is_err()); - } - - #[test] - fn arr_obj_with_nested_type() { - let json = json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1}] - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1, "b": 2}] - }, - ]); - let flattened_json = convert_to_array( - convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V0, - &crate::event::format::LogSource::default(), - ) - .unwrap(), - ) - .unwrap(); - - let (rb, _) = json::Event::new(flattened_json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - assert_eq!(rb.num_rows(), 4); - assert_eq!(rb.num_columns(), 5); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![ - Some("hello"), - Some("hello"), - Some("hello"), - Some("hello") - ]) - ); - - assert_eq!( - rb.column_by_name("c_a") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(vec![ - None, - None, - Some(vec![Some(1i64)]), - Some(vec![Some(1)]) - ]) - ); - - assert_eq!( - rb.column_by_name("c_b") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(vec![ - None, - None, - None, - Some(vec![Some(2i64)]) - ]) - ); - } - - #[test] - fn arr_obj_with_nested_type_v1() { - let json = json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1}] - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1, "b": 2}] - }, - ]); - let flattened_json = convert_to_array( - convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V1, - &crate::event::format::LogSource::default(), - ) - .unwrap(), - ) - .unwrap(); - - let (rb, _) = json::Event::new(flattened_json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1) - .unwrap(); - - assert_eq!(rb.num_rows(), 4); - assert_eq!(rb.num_columns(), 5); - assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![ - Some("hello"), - Some("hello"), - Some("hello"), - Some("hello") - ]) - ); - - assert_eq!( - rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) - ); - - assert_eq!( - rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, None, None, Some(2.0)]) - ); - } -} +// #[cfg(test)] +// mod tests { + +// use arrow::datatypes::Int64Type; +// use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; +// use arrow_schema::{DataType, Field}; +// use serde_json::json; +// use std::{collections::HashMap, sync::Arc}; + +// use crate::{ +// metadata::SchemaVersion, +// utils::json::{convert_array_to_object, flatten::convert_to_array}, +// }; + +// trait TestExt { +// fn as_int64_arr(&self) -> Option<&Int64Array>; +// fn as_float64_arr(&self) -> Option<&Float64Array>; +// fn as_utf8_arr(&self) -> Option<&StringArray>; +// } + +// impl TestExt for ArrayRef { +// fn as_int64_arr(&self) -> Option<&Int64Array> { +// self.as_any().downcast_ref() +// } + +// fn as_float64_arr(&self) -> Option<&Float64Array> { +// self.as_any().downcast_ref() +// } + +// fn as_utf8_arr(&self) -> Option<&StringArray> { +// self.as_any().downcast_ref() +// } +// } + +// fn fields_to_map(iter: impl Iterator) -> HashMap> { +// iter.map(|x| (x.name().clone(), Arc::new(x))).collect() +// } + +// #[test] +// fn basic_object_into_rb() { +// let json = json!({ +// "c": 4.23, +// "a": 1, +// "b": "hello", +// }); + +// let (rb, _) = +// into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 1); +// assert_eq!(rb.num_columns(), 4); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from_iter([1]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from_iter_values(["hello"]) +// ); +// assert_eq!( +// rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from_iter([4.23]) +// ); +// } + +// #[test] +// fn basic_object_with_null_into_rb() { +// let json = json!({ +// "a": 1, +// "b": "hello", +// "c": null +// }); + +// let (rb, _) = +// into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 1); +// assert_eq!(rb.num_columns(), 3); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from_iter([1]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from_iter_values(["hello"]) +// ); +// } + +// #[test] +// fn basic_object_derive_schema_into_rb() { +// let json = json!({ +// "a": 1, +// "b": "hello", +// }); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 1); +// assert_eq!(rb.num_columns(), 3); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from_iter([1]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from_iter_values(["hello"]) +// ); +// } + +// #[test] +// fn basic_object_schema_mismatch() { +// let json = json!({ +// "a": 1, +// "b": 1, // type mismatch +// }); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); +// } + +// #[test] +// fn empty_object() { +// let json = json!({}); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 1); +// assert_eq!(rb.num_columns(), 1); +// } + +// #[test] +// fn non_object_arr_is_err() { +// let json = json!([1]); + +// assert!(convert_array_to_object( +// json, +// None, +// None, +// None, +// SchemaVersion::V0, +// &crate::event::format::LogSource::default() +// ) +// .is_err()) +// } + +// #[test] +// fn array_into_recordbatch_inffered_schema() { +// let json = json!([ +// { +// "b": "hello", +// }, +// { +// "b": "hello", +// "a": 1, +// "c": 1 +// }, +// { +// "a": 1, +// "b": "hello", +// "c": null +// }, +// ]); + +// let (rb, _) = +// into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 3); +// assert_eq!(rb.num_columns(), 4); + +// let schema = rb.schema(); +// let fields = &schema.fields; + +// assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true)); +// assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true)); +// assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); + +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![None, Some(1), Some(1)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) +// ); +// assert_eq!( +// rb.column_by_name("c").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![None, Some(1), None]) +// ); +// } + +// #[test] +// fn arr_with_null_into_rb() { +// let json = json!([ +// { +// "c": null, +// "b": "hello", +// "a": null +// }, +// { +// "a": 1, +// "c": 1.22, +// "b": "hello" +// }, +// { +// "b": "hello", +// "a": 1, +// "c": null +// }, +// ]); + +// let (rb, _) = +// into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 3); +// assert_eq!(rb.num_columns(), 4); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![None, Some(1), Some(1)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) +// ); +// assert_eq!( +// rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![None, Some(1.22), None,]) +// ); +// } + +// #[test] +// fn arr_with_null_derive_schema_into_rb() { +// let json = json!([ +// { +// "c": null, +// "b": "hello", +// "a": null +// }, +// { +// "a": 1, +// "c": 1.22, +// "b": "hello" +// }, +// { +// "b": "hello", +// "a": 1, +// "c": null +// }, +// ]); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + +// assert_eq!(rb.num_rows(), 3); +// assert_eq!(rb.num_columns(), 4); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![None, Some(1), Some(1)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) +// ); +// assert_eq!( +// rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![None, Some(1.22), None,]) +// ); +// } + +// #[test] +// fn arr_schema_mismatch() { +// let json = json!([ +// { +// "a": null, +// "b": "hello", +// "c": 1.24 +// }, +// { +// "a": 1, +// "b": "hello", +// "c": 1 +// }, +// { +// "a": 1, +// "b": "hello", +// "c": null +// }, +// ]); + +// let schema = fields_to_map( +// [ +// Field::new("a", DataType::Int64, true), +// Field::new("b", DataType::Utf8, true), +// Field::new("c", DataType::Float64, true), +// ] +// .into_iter(), +// ); + +// assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); +// } + +// #[test] +// fn arr_obj_with_nested_type() { +// let json = json!([ +// { +// "a": 1, +// "b": "hello", +// }, +// { +// "a": 1, +// "b": "hello", +// }, +// { +// "a": 1, +// "b": "hello", +// "c": [{"a": 1}] +// }, +// { +// "a": 1, +// "b": "hello", +// "c": [{"a": 1, "b": 2}] +// }, +// ]); +// let flattened_json = convert_to_array( +// convert_array_to_object( +// json, +// None, +// None, +// None, +// SchemaVersion::V0, +// &crate::event::format::LogSource::default(), +// ) +// .unwrap(), +// ) +// .unwrap(); + +// let (rb, _) = into_event_batch( +// flattened_json, +// HashMap::default(), +// false, +// None, +// SchemaVersion::V0, +// ) +// .unwrap(); +// assert_eq!(rb.num_rows(), 4); +// assert_eq!(rb.num_columns(), 5); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), +// &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![ +// Some("hello"), +// Some("hello"), +// Some("hello"), +// Some("hello") +// ]) +// ); + +// assert_eq!( +// rb.column_by_name("c_a") +// .unwrap() +// .as_any() +// .downcast_ref::() +// .unwrap(), +// &ListArray::from_iter_primitive::(vec![ +// None, +// None, +// Some(vec![Some(1i64)]), +// Some(vec![Some(1)]) +// ]) +// ); + +// assert_eq!( +// rb.column_by_name("c_b") +// .unwrap() +// .as_any() +// .downcast_ref::() +// .unwrap(), +// &ListArray::from_iter_primitive::(vec![ +// None, +// None, +// None, +// Some(vec![Some(2i64)]) +// ]) +// ); +// } + +// #[test] +// fn arr_obj_with_nested_type_v1() { +// let json = json!([ +// { +// "a": 1, +// "b": "hello", +// }, +// { +// "a": 1, +// "b": "hello", +// }, +// { +// "a": 1, +// "b": "hello", +// "c": [{"a": 1}] +// }, +// { +// "a": 1, +// "b": "hello", +// "c": [{"a": 1, "b": 2}] +// }, +// ]); +// let flattened_json = convert_to_array( +// convert_array_to_object( +// json, +// None, +// None, +// None, +// SchemaVersion::V1, +// &crate::event::format::LogSource::default(), +// ) +// .unwrap(), +// ) +// .unwrap(); + +// let (rb, _) = into_event_batch( +// flattened_json, +// HashMap::default(), +// false, +// None, +// SchemaVersion::V1, +// ) +// .unwrap(); + +// assert_eq!(rb.num_rows(), 4); +// assert_eq!(rb.num_columns(), 5); +// assert_eq!( +// rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) +// ); +// assert_eq!( +// rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), +// &StringArray::from(vec![ +// Some("hello"), +// Some("hello"), +// Some("hello"), +// Some("hello") +// ]) +// ); + +// assert_eq!( +// rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) +// ); + +// assert_eq!( +// rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(), +// &Float64Array::from(vec![None, None, None, Some(2.0)]) +// ); +// } +// } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 84d5ae117..4ec3a9d94 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -30,7 +30,6 @@ use crate::{ }, otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, parseable::PARSEABLE, - storage::StreamType, utils::json::{convert_array_to_object, flatten::convert_to_array}, }; @@ -85,7 +84,6 @@ async fn push_logs( let time_partition_limit = PARSEABLE .get_stream(stream_name)? .get_time_partition_limit(); - let static_schema_flag = stream.get_static_schema_flag(); let custom_partition = stream.get_custom_partition(); let schema_version = stream.get_schema_version(); let p_timestamp = Utc::now(); @@ -112,19 +110,9 @@ async fn push_logs( for json in data { let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length - let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw(); json::Event { json, p_timestamp } - .into_event( - stream_name.to_owned(), - origin_size, - &schema, - static_schema_flag, - custom_partition.as_ref(), - time_partition.as_ref(), - schema_version, - StreamType::UserDefined, - )? - .process()?; + .to_event(&stream, origin_size)? + .process(&stream)?; } Ok(()) } diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 6e4f55e94..bee083bce 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode}; use once_cell::sync::Lazy; pub use staging::StagingError; use streams::StreamRef; -pub use streams::{StreamNotFound, Streams}; +pub use streams::{Stream, StreamNotFound, Streams}; use tracing::error; #[cfg(feature = "kafka")]