From 9acce01b521f8ae7866041987251194bb65ac712 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sun, 7 May 2023 16:26:55 +0530 Subject: [PATCH 1/5] Fix p_timestamp to index 0 --- server/src/event.rs | 118 +++--------------------- server/src/event/format.rs | 110 +++++++++------------- server/src/event/format/json.rs | 70 +++++--------- server/src/event/writer/mutable.rs | 2 +- server/src/handlers/http/ingest.rs | 99 +++++++++++--------- server/src/metadata.rs | 24 ++--- server/src/utils/arrow/merged_reader.rs | 65 ++++--------- 7 files changed, 162 insertions(+), 326 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index dcc061c7a..152134d0e 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -22,9 +22,8 @@ mod writer; use arrow_array::RecordBatch; use arrow_schema::{Field, Schema}; +use itertools::Itertools; -use std::collections::HashMap; -use std::ops::DerefMut; use std::sync::Arc; use crate::metadata; @@ -42,6 +41,7 @@ pub struct Event { pub rb: RecordBatch, pub origin_format: &'static str, pub origin_size: u64, + pub is_first_event: bool, } // Events holds the schema related to a each event for a single log stream @@ -50,7 +50,7 @@ impl Event { let key = get_schema_key(&self.rb.schema().fields); let num_rows = self.rb.num_rows() as u64; - if self.is_first_event(metadata::STREAM_INFO.schema(&self.stream_name)?.as_ref()) { + if self.is_first_event { commit_schema(&self.stream_name, self.rb.schema())?; } @@ -73,25 +73,6 @@ impl Event { Ok(()) } - fn is_first_event(&self, stream_schema: &Schema) -> bool { - let mut stream_fields = stream_schema.fields().iter(); - let event_schema = self.rb.schema(); - let event_fields = event_schema.fields(); - - for field in event_fields { - loop { - let Some(stream_field) = stream_fields.next() else { return true }; - if stream_field.name() == field.name() { - break; - } else { - continue; - } - } - } - - false - } - // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. fn process_event( @@ -104,10 +85,10 @@ impl Event { } } -pub fn get_schema_key(fields: &Vec) -> String { +pub fn get_schema_key(fields: &[Field]) -> String { // Fields must be sorted let mut hasher = xxhash_rust::xxh3::Xxh3::new(); - for field in fields { + for field in fields.iter().sorted_by_key(|v| v.name()) { hasher.update(field.name().as_bytes()) } let hash = hasher.digest(); @@ -117,36 +98,17 @@ pub fn get_schema_key(fields: &Vec) -> String { pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), EventError> { let mut stream_metadata = metadata::STREAM_INFO.write().expect("lock poisoned"); - let mut schema = Schema::try_merge(vec![ - schema.as_ref().clone(), - stream_metadata.get_unchecked(stream_name).as_ref().clone(), - ]) - .unwrap(); - schema.fields.sort_by(|a, b| a.name().cmp(b.name())); - - stream_metadata.set_unchecked(stream_name, Arc::new(schema)); + let map = &mut stream_metadata + .get_mut(stream_name) + .expect("map has entry for this stream name") + .schema; + let current_schema = Schema::new(map.values().cloned().collect()); + let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; + map.clear(); + map.extend(schema.fields.into_iter().map(|f| (f.name().clone(), f))); Ok(()) } -trait UncheckedOp: DerefMut> { - fn get_unchecked(&self, stream_name: &str) -> Arc { - let schema = &self - .get(stream_name) - .expect("map has entry for this stream name") - .schema; - - Arc::clone(schema) - } - - fn set_unchecked(&mut self, stream_name: &str, schema: Arc) { - self.get_mut(stream_name) - .expect("map has entry for this stream name") - .schema = schema - } -} - -impl>> UncheckedOp for T {} - pub mod error { use arrow_schema::ArrowError; @@ -167,57 +129,3 @@ pub mod error { ObjectStorage(#[from] ObjectStorageError), } } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow_array::RecordBatch; - use arrow_schema::{DataType, Field, Schema}; - - use super::Event; - - fn test_rb(fields: Vec) -> RecordBatch { - RecordBatch::new_empty(Arc::new(Schema::new(fields))) - } - - fn test_event(fields: Vec) -> Event { - Event { - stream_name: "".to_string(), - rb: test_rb(fields), - origin_format: "none", - origin_size: 0, - } - } - - #[test] - fn new_field_is_new_event() { - let schema = Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Int64, true), - ]); - - let new_event = test_event(vec![ - Field::new("a", DataType::Int64, true), - Field::new("c", DataType::Int64, true), - ]); - - assert!(new_event.is_first_event(&schema)); - } - - #[test] - fn same_field_not_is_new_event() { - let schema = Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Int64, true), - Field::new("c", DataType::Int64, true), - ]); - - let new_event = test_event(vec![ - Field::new("a", DataType::Int64, true), - Field::new("c", DataType::Int64, true), - ]); - - assert!(!new_event.is_first_event(&schema)); - } -} diff --git a/server/src/event/format.rs b/server/src/event/format.rs index b0554d4f9..94cf9a9c6 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -17,7 +17,7 @@ * */ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, Error as AnyError}; use arrow_array::{RecordBatch, StringArray, TimestampMillisecondArray}; @@ -35,54 +35,54 @@ type Metadata = String; pub trait EventFormat: Sized { type Data; - fn to_data(self, schema: &Schema) -> Result<(Self::Data, Schema, Tags, Metadata), AnyError>; + fn to_data( + self, + schema: &HashMap, + ) -> Result<(Self::Data, Schema, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; - fn into_recordbatch(self, schema: &Schema) -> Result { - let (data, mut schema, tags, metadata) = self.to_data(schema)?; - - match tags_index(&schema) { - Ok(_) => return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)), - Err(index) => { - schema - .fields - .insert(index, Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true)); - } + fn into_recordbatch( + self, + schema: &HashMap, + ) -> Result<(RecordBatch, bool), AnyError> { + let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?; + + if schema.field_with_name(DEFAULT_TAGS_KEY).is_ok() { + return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); }; - match metadata_index(&schema) { - Ok(_) => { - return Err(anyhow!( - "field {} is a reserved field", - DEFAULT_METADATA_KEY - )) - } - Err(index) => { - schema.fields.insert( - index, - Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true), - ); - } + if schema.field_with_name(DEFAULT_TAGS_KEY).is_ok() { + return Err(anyhow!( + "field {} is a reserved field", + DEFAULT_METADATA_KEY + )); }; - match timestamp_index(&schema) { - Ok(_) => { - return Err(anyhow!( - "field {} is a reserved field", - DEFAULT_TIMESTAMP_KEY - )) - } - Err(index) => { - schema.fields.insert( - index, - Field::new( - DEFAULT_TIMESTAMP_KEY, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - ), - ); - } + if schema.field_with_name(DEFAULT_TAGS_KEY).is_ok() { + return Err(anyhow!( + "field {} is a reserved field", + DEFAULT_TIMESTAMP_KEY + )); }; + schema.fields.insert( + 0, + Field::new( + DEFAULT_TIMESTAMP_KEY, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + ); + let tags_index = schema.fields.len(); + + schema + .fields + .push(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true)); + + let metadata_index = schema.fields.len(); + schema + .fields + .push(Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true)); + let schema_ref = Arc::new(schema); let rb = Self::decode(data, Arc::clone(&schema_ref))?; let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows())); @@ -93,11 +93,7 @@ pub trait EventFormat: Sized { let rb = utils::arrow::replace_columns( Arc::clone(&schema_ref), rb, - &[ - timestamp_index(&schema_ref).expect("timestamp field exists"), - tags_index(&schema_ref).expect("tags field exists"), - metadata_index(&schema_ref).expect("metadata field exists"), - ], + &[0, tags_index, metadata_index], &[ Arc::new(timestamp_array), Arc::new(tags_arr), @@ -105,28 +101,10 @@ pub trait EventFormat: Sized { ], ); - Ok(rb) + Ok((rb, is_first)) } } -fn tags_index(schema: &Schema) -> Result { - schema - .fields - .binary_search_by_key(&DEFAULT_TAGS_KEY, |field| field.name()) -} - -fn metadata_index(schema: &Schema) -> Result { - schema - .fields - .binary_search_by_key(&DEFAULT_METADATA_KEY, |field| field.name()) -} - -fn timestamp_index(schema: &Schema) -> Result { - schema - .fields - .binary_search_by_key(&DEFAULT_TIMESTAMP_KEY, |field| field.name()) -} - fn get_timestamp_array(size: usize) -> TimestampMillisecondArray { let time = Utc::now(); TimestampMillisecondArray::from_value(time.timestamp_millis(), size) diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index af9c1c874..45becde5f 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -25,7 +25,7 @@ use arrow_json::reader::{infer_json_schema_from_iterator, Decoder, DecoderOption use arrow_schema::{DataType, Field, Schema}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use serde_json::Value; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use super::EventFormat; use crate::utils::json::flatten_json_body; @@ -41,8 +41,8 @@ impl EventFormat for Event { fn to_data( self, - schema: &Schema, - ) -> Result<(Self::Data, Schema, String, String), anyhow::Error> { + schema: &HashMap, + ) -> Result<(Self::Data, Schema, bool, String, String), anyhow::Error> { let data = flatten_json_body(self.data)?; let stream_schema = schema; @@ -56,19 +56,18 @@ impl EventFormat for Event { let fields = collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); - let schema = match derive_sub_schema(stream_schema.clone(), fields) { + let mut is_first = false; + let schema = match derive_sub_schema(stream_schema, fields) { Ok(schema) => schema, Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) { - Ok(mut infer_schema) => { - infer_schema - .fields - .sort_by(|field1, field2| Ord::cmp(field1.name(), field2.name())); - - if let Err(err) = - Schema::try_merge(vec![stream_schema.clone(), infer_schema.clone()]) - { + Ok(infer_schema) => { + if let Err(err) = Schema::try_merge(vec![ + Schema::new(stream_schema.values().cloned().collect()), + infer_schema.clone(), + ]) { return Err(anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err)); } + is_first = true; infer_schema } Err(err) => { @@ -89,7 +88,7 @@ impl EventFormat for Event { )); } - Ok((value_arr, schema, self.tags, self.metadata)) + Ok((value_arr, schema, is_first, self.tags, self.metadata)) } fn decode(data: Self::Data, schema: Arc) -> Result { @@ -108,50 +107,27 @@ impl EventFormat for Event { } } -// invariants for this to work. -// All fields in existing schema and fields in event are sorted my name lexographically -fn derive_sub_schema(schema: arrow_schema::Schema, fields: Vec<&str>) -> Result { - let fields = derive_subset(schema.fields, fields)?; - Ok(Schema::new(fields)) -} +fn derive_sub_schema(schema: &HashMap, fields: Vec<&str>) -> Result { + let mut res = Vec::with_capacity(fields.len()); + let fields = fields.into_iter().map(|field_name| schema.get(field_name)); -fn derive_subset(superset: Vec, subset: Vec<&str>) -> Result, ()> { - let mut superset_idx = 0; - let mut subset_idx = 0; - let mut subset_schema = Vec::with_capacity(subset.len()); - - while superset_idx < superset.len() && subset_idx < subset.len() { - let field = superset[superset_idx].clone(); - let key = subset[subset_idx]; - if field.name() == key { - subset_schema.push(field); - superset_idx += 1; - subset_idx += 1; - } else if field.name().as_str() < key { - superset_idx += 1; - } else { - return Err(()); - } - } - - // error if subset is not exhausted - if subset_idx < subset.len() { - return Err(()); + for field in fields { + let Some(field) = field else { return Err(()) }; + res.push(field.clone()) } - Ok(subset_schema) + Ok(Schema::new(res)) } - // Must be in sorted order fn collect_keys<'a>(values: impl Iterator) -> Result, ()> { - let mut sorted_keys = Vec::new(); + let mut keys = Vec::new(); for value in values { if let Some(obj) = value.as_object() { for key in obj.keys() { - match sorted_keys.binary_search(&key.as_str()) { + match keys.binary_search(&key.as_str()) { Ok(_) => (), Err(pos) => { - sorted_keys.insert(pos, key.as_str()); + keys.insert(pos, key.as_str()); } } } @@ -159,7 +135,7 @@ fn collect_keys<'a>(values: impl Iterator) -> Result bool { diff --git a/server/src/event/writer/mutable.rs b/server/src/event/writer/mutable.rs index 7fc1b9fcc..d3bad158a 100644 --- a/server/src/event/writer/mutable.rs +++ b/server/src/event/writer/mutable.rs @@ -292,7 +292,7 @@ impl MutableColumns { pub fn push(&mut self, rb: RecordBatch) { let num_rows = rb.num_rows(); let schema = rb.schema(); - let rb = schema.fields().iter().zip(rb.columns().iter()); + let rb = schema.fields().iter().zip(rb.columns().iter()).sorted_by_key(|(f, _)| f.name()); // start index map to next location in self columns let mut index = 0; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 8d3883f1c..fdb26c1d1 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -16,9 +16,11 @@ * */ +use std::collections::HashMap; + use actix_web::http::header::ContentType; use actix_web::{HttpRequest, HttpResponse}; -use arrow_schema::Schema; +use arrow_schema::Field; use bytes::Bytes; use http::StatusCode; use serde_json::Value; @@ -27,7 +29,6 @@ use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KEY}; -use crate::metadata::error::stream_info::MetadataError; use crate::metadata::STREAM_INFO; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; @@ -61,14 +62,21 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let schema = STREAM_INFO.schema(&stream_name)?; - let (size, rb) = into_event_batch(req, body, &schema)?; + let (size, rb, is_first_event) = { + let hash_map = STREAM_INFO.read().unwrap(); + let schema = &hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name.clone()))? + .schema; + into_event_batch(req, body, schema)? + }; event::Event { rb, stream_name, origin_format: "json", origin_size: size as u64, + is_first_event, } .process() .await?; @@ -80,8 +88,8 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result fn into_event_batch( req: HttpRequest, body: Bytes, - schema: &Schema, -) -> Result<(usize, arrow_array::RecordBatch), PostError> { + schema: &HashMap, +) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; let size = body.len(); @@ -91,14 +99,14 @@ fn into_event_batch( tags, metadata, }; - let rb = event.into_recordbatch(schema)?; - Ok((size, rb)) + let (rb, is_first) = event.into_recordbatch(schema)?; + Ok((size, rb, is_first)) } #[derive(Debug, thiserror::Error)] pub enum PostError { #[error("{0}")] - StreamNotFound(#[from] MetadataError), + StreamNotFound(String), #[error("Could not deserialize into JSON object, {0}")] SerdeError(#[from] serde_json::Error), #[error("Header Error: {0}")] @@ -133,11 +141,13 @@ impl actix_web::ResponseError for PostError { #[cfg(test)] mod tests { + use std::collections::HashMap; + use actix_web::test::TestRequest; use arrow_array::{ types::Int64Type, ArrayRef, Float64Array, Int64Array, ListArray, StringArray, }; - use arrow_schema::{DataType, Field, Schema}; + use arrow_schema::{DataType, Field}; use bytes::Bytes; use serde_json::json; @@ -181,10 +191,10 @@ mod tests { .append_header((PREFIX_META.to_string() + "C", "meta1")) .to_http_request(); - let (size, rb) = into_event_batch( + let (size, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &Schema::empty(), + &HashMap::default(), ) .unwrap(); @@ -224,10 +234,10 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb) = into_event_batch( + let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &Schema::empty(), + &HashMap::default(), ) .unwrap(); @@ -247,15 +257,15 @@ mod tests { "b": "hello", }); - let schema = Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), + let schema = HashMap::from([ + ("a".to_string(), Field::new("a", DataType::Int64, true)), + ("b".to_string(), Field::new("b", DataType::Utf8, true)), + ("c".to_string(), Field::new("c", DataType::Float64, true)), ]); let req = TestRequest::default().to_http_request(); - let (_, rb) = into_event_batch( + let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), &schema, @@ -278,10 +288,10 @@ mod tests { "b": 1, // type mismatch }); - let schema = Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), + let schema = HashMap::from([ + ("a".to_string(), Field::new("a", DataType::Int64, true)), + ("b".to_string(), Field::new("b", DataType::Utf8, true)), + ("c".to_string(), Field::new("c", DataType::Float64, true)), ]); let req = TestRequest::default().to_http_request(); @@ -298,15 +308,15 @@ mod tests { fn empty_object() { let json = json!({}); - let schema = Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Float64, true), - Field::new("c", DataType::Float64, true), + let schema = HashMap::from([ + ("a".to_string(), Field::new("a", DataType::Int64, true)), + ("b".to_string(), Field::new("b", DataType::Utf8, true)), + ("c".to_string(), Field::new("c", DataType::Float64, true)), ]); let req = TestRequest::default().to_http_request(); - let (_, rb) = into_event_batch( + let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), &schema, @@ -326,7 +336,7 @@ mod tests { assert!(into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &Schema::empty(), + &HashMap::default(), ) .is_err()) } @@ -353,10 +363,10 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb) = into_event_batch( + let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &Schema::empty(), + &HashMap::default(), ) .unwrap(); @@ -396,15 +406,14 @@ mod tests { }, ]); - let schema = Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), + let schema = HashMap::from([ + ("a".to_string(), Field::new("a", DataType::Int64, true)), + ("b".to_string(), Field::new("b", DataType::Utf8, true)), + ("c".to_string(), Field::new("c", DataType::Float64, true)), ]); - let req = TestRequest::default().to_http_request(); - let (_, rb) = into_event_batch( + let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), &schema, @@ -449,10 +458,10 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb) = into_event_batch( + let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &Schema::empty(), + &HashMap::default(), ) .unwrap(); @@ -490,10 +499,10 @@ mod tests { let req = TestRequest::default().to_http_request(); - let schema = Schema::new(vec![ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), + let schema = HashMap::from([ + ("a".to_string(), Field::new("a", DataType::Int64, true)), + ("b".to_string(), Field::new("b", DataType::Utf8, true)), + ("c".to_string(), Field::new("c", DataType::Float64, true)), ]); assert!(into_event_batch( @@ -529,10 +538,10 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb) = into_event_batch( + let (_, rb, _) = into_event_batch( req, Bytes::from(serde_json::to_vec(&json).unwrap()), - &Schema::empty(), + &HashMap::default(), ) .unwrap(); diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 6ee2cdeeb..9c50d5242 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -17,7 +17,7 @@ */ use arrow_array::RecordBatch; -use arrow_schema::Schema; +use arrow_schema::{Field, Schema}; use once_cell::sync::Lazy; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -37,21 +37,12 @@ pub static STREAM_INFO: Lazy = Lazy::new(StreamInfo::default); #[derive(Debug, Deref, DerefMut, Default)] pub struct StreamInfo(RwLock>); -#[derive(Debug)] +#[derive(Debug, Default)] pub struct LogStreamMetadata { - pub schema: Arc, + pub schema: HashMap, pub alerts: Alerts, } -impl Default for LogStreamMetadata { - fn default() -> Self { - Self { - schema: Arc::new(Schema::empty()), - alerts: Alerts::default(), - } - } -} - // It is very unlikely that panic will occur when dealing with metadata. pub const LOCK_EXPECT: &str = "no method in metadata should panic while holding a lock"; @@ -95,7 +86,9 @@ impl StreamInfo { .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| &metadata.schema)?; - Ok(Arc::clone(schema)) + let schema = Schema::new(schema.values().cloned().collect()); + + Ok(Arc::new(schema)) } pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { @@ -130,8 +123,9 @@ impl StreamInfo { let alerts = storage.get_alerts(&stream.name).await?; let schema = storage.get_schema(&stream.name).await?; - let schema = Arc::new(update_schema_from_staging(&stream.name, schema)); - + let schema = update_schema_from_staging(&stream.name, schema); + let schema = + HashMap::from_iter(schema.fields.into_iter().map(|v| (v.name().to_owned(), v))); let metadata = LogStreamMetadata { schema, alerts }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/utils/arrow/merged_reader.rs b/server/src/utils/arrow/merged_reader.rs index 4c9feb5f7..45b797c53 100644 --- a/server/src/utils/arrow/merged_reader.rs +++ b/server/src/utils/arrow/merged_reader.rs @@ -26,30 +26,9 @@ use itertools::kmerge_by; use super::adapt_batch; -#[derive(Debug)] -pub struct Reader { - reader: StreamReader, - timestamp_col_index: usize, -} - -impl From> for Reader { - fn from(reader: StreamReader) -> Self { - let timestamp_col_index = reader - .schema() - .all_fields() - .binary_search_by(|field| field.name().as_str().cmp("p_timestamp")) - .expect("schema should have this field"); - - Self { - reader, - timestamp_col_index, - } - } -} - #[derive(Debug)] pub struct MergedRecordReader { - pub readers: Vec, + pub readers: Vec>, } impl MergedRecordReader { @@ -58,46 +37,38 @@ impl MergedRecordReader { for file in files { let reader = StreamReader::try_new(File::open(file).unwrap(), None).map_err(|_| ())?; - readers.push(reader.into()); + readers.push(reader); } Ok(Self { readers }) } pub fn merged_iter(self, schema: &Schema) -> impl Iterator + '_ { - let adapted_readers = self.readers.into_iter().map(move |reader| { - reader - .reader - .flatten() - .zip(std::iter::repeat(reader.timestamp_col_index)) - }); + let adapted_readers = self.readers.into_iter().map(move |reader| reader.flatten()); - kmerge_by( - adapted_readers, - |(a, a_col): &(RecordBatch, usize), (b, b_col): &(RecordBatch, usize)| { - let a: &TimestampMillisecondArray = a - .column(*a_col) - .as_any() - .downcast_ref::() - .unwrap(); + kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| { + let a: &TimestampMillisecondArray = a + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); - let b: &TimestampMillisecondArray = b - .column(*b_col) - .as_any() - .downcast_ref::() - .unwrap(); + let b: &TimestampMillisecondArray = b + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); - a.value(0) < b.value(0) - }, - ) - .map(|(batch, _)| adapt_batch(schema, batch)) + a.value(0) < b.value(0) + }) + .map(|batch| adapt_batch(schema, batch)) } pub fn merged_schema(&self) -> Schema { Schema::try_merge( self.readers .iter() - .map(|reader| reader.reader.schema().as_ref().clone()), + .map(|reader| reader.schema().as_ref().clone()), ) .unwrap() } From c80d6bfe2fce8b8ecaeb2fc450b39ae200171e30 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 8 May 2023 11:34:39 +0530 Subject: [PATCH 2/5] Fix test --- server/src/handlers/http/ingest.rs | 55 +++++++++++++++++++----------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index fdb26c1d1..d1323fcc5 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -201,13 +201,16 @@ mod tests { assert_eq!(size, 28); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); - assert_eq!(rb.column(0).as_int64_arr(), &Int64Array::from_iter([1])); assert_eq!( - rb.column(1).as_utf8_arr(), + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from_iter([1]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from_iter_values(["hello"]) ); assert_eq!( - rb.column(2).as_float64_arr(), + rb.column_by_name("c").unwrap().as_float64_arr(), &Float64Array::from_iter([4.23]) ); assert_eq!( @@ -243,9 +246,12 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); - assert_eq!(rb.column(0).as_int64_arr(), &Int64Array::from_iter([1])); assert_eq!( - rb.column(1).as_utf8_arr(), + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from_iter([1]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from_iter_values(["hello"]) ); } @@ -274,9 +280,12 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); - assert_eq!(rb.column(0).as_int64_arr(), &Int64Array::from_iter([1])); assert_eq!( - rb.column(1).as_utf8_arr(), + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from_iter([1]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from_iter_values(["hello"]) ); } @@ -373,15 +382,15 @@ mod tests { assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column(0).as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr(), &Int64Array::from(vec![None, Some(1), Some(1)]) ); assert_eq!( - rb.column(1).as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) ); assert_eq!( - rb.column(2).as_float64_arr(), + rb.column_by_name("c").unwrap().as_float64_arr(), &Float64Array::from(vec![None, Some(1.22), None,]) ); } @@ -423,15 +432,15 @@ mod tests { assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column(0).as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr(), &Int64Array::from(vec![None, Some(1), Some(1)]) ); assert_eq!( - rb.column(1).as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) ); assert_eq!( - rb.column(2).as_float64_arr(), + rb.column_by_name("c").unwrap().as_float64_arr(), &Float64Array::from(vec![None, Some(1.22), None,]) ); } @@ -468,11 +477,11 @@ mod tests { assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 5); assert_eq!( - rb.column(0).as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr(), &Int64Array::from(vec![Some(1), Some(1), Some(1)]) ); assert_eq!( - rb.column(1).as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) ); } @@ -548,11 +557,11 @@ mod tests { assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 7); assert_eq!( - rb.column(0).as_int64_arr(), + rb.column_by_name("a").unwrap().as_int64_arr(), &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) ); assert_eq!( - rb.column(1).as_utf8_arr(), + rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from(vec![ Some("hello"), Some("hello"), @@ -565,12 +574,20 @@ mod tests { let c_b = vec![None, None, None, Some(vec![Some(2i64)])]; assert_eq!( - rb.column(2).as_any().downcast_ref::().unwrap(), + rb.column_by_name("c_a") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), &ListArray::from_iter_primitive::(c_a) ); assert_eq!( - rb.column(3).as_any().downcast_ref::().unwrap(), + rb.column_by_name("c_b") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), &ListArray::from_iter_primitive::(c_b) ); } From d3e8d37b4e3f262a8671ddcbf1cdddb9a4a5a474 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 8 May 2023 11:36:07 +0530 Subject: [PATCH 3/5] Fix --- server/src/event/writer/mutable.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/event/writer/mutable.rs b/server/src/event/writer/mutable.rs index d3bad158a..6f13a8bd6 100644 --- a/server/src/event/writer/mutable.rs +++ b/server/src/event/writer/mutable.rs @@ -292,7 +292,11 @@ impl MutableColumns { pub fn push(&mut self, rb: RecordBatch) { let num_rows = rb.num_rows(); let schema = rb.schema(); - let rb = schema.fields().iter().zip(rb.columns().iter()).sorted_by_key(|(f, _)| f.name()); + let rb = schema + .fields() + .iter() + .zip(rb.columns().iter()) + .sorted_by_key(|(f, _)| f.name()); // start index map to next location in self columns let mut index = 0; From 5fb26ad1d525104c57e40c8f71a81a5e1bd61fd3 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 8 May 2023 13:43:00 +0530 Subject: [PATCH 4/5] Sort fields --- server/src/metadata.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 9c50d5242..09a6600f2 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,6 +18,7 @@ use arrow_array::RecordBatch; use arrow_schema::{Field, Schema}; +use itertools::Itertools; use once_cell::sync::Lazy; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -86,7 +87,13 @@ impl StreamInfo { .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| &metadata.schema)?; - let schema = Schema::new(schema.values().cloned().collect()); + let fields = schema + .values() + .sorted_by_key(|field| field.name()) + .cloned() + .collect(); + + let schema = Schema::new(fields); Ok(Arc::new(schema)) } From ccbeb50e26d0c9be32646e321c952dd3f4eda9d8 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Mon, 8 May 2023 13:58:21 +0530 Subject: [PATCH 5/5] Add comment --- server/src/metadata.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 09a6600f2..a6724c378 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -87,6 +87,8 @@ impl StreamInfo { .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| &metadata.schema)?; + // sort fields on read from hashmap as order of fields can differ. + // This provides a stable output order if schema is same between calls to this function let fields = schema .values() .sorted_by_key(|field| field.name())