Skip to content

Replace mutable buffer with concat_batches #402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 16, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ arrow-schema = { version = "36.0.0", features = ["serde"] }
arrow-array = { version = "36.0.0" }
arrow-json = "36.0.0"
arrow-ipc = "36.0.0"
arrow-select = "36.0.0"
async-trait = "0.1"
base64 = "0.21"
bytes = "1.4"
Expand Down
3 changes: 2 additions & 1 deletion server/src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod rule;
pub mod target;

use crate::metrics::ALERTS_STATES;
use crate::utils::arrow::get_field;
use crate::utils::uid;
use crate::CONFIG;
use crate::{storage, utils};
Expand Down Expand Up @@ -135,7 +136,7 @@ impl Message {
// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
pub fn valid(&self, schema: &Schema, column: Option<&str>) -> bool {
if let Some(col) = column {
return schema.field_with_name(col).is_ok();
return get_field(schema, col).is_some();
}
true
}
Expand Down
118 changes: 13 additions & 105 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ mod writer;

use arrow_array::RecordBatch;
use arrow_schema::{Field, Schema};
use itertools::Itertools;

use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::Arc;

use crate::metadata;
Expand All @@ -42,6 +41,7 @@ pub struct Event {
pub rb: RecordBatch,
pub origin_format: &'static str,
pub origin_size: u64,
pub is_first_event: bool,
}

// Events holds the schema related to a each event for a single log stream
Expand All @@ -50,7 +50,7 @@ impl Event {
let key = get_schema_key(&self.rb.schema().fields);
let num_rows = self.rb.num_rows() as u64;

if self.is_first_event(metadata::STREAM_INFO.schema(&self.stream_name)?.as_ref()) {
if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Expand All @@ -73,25 +73,6 @@ impl Event {
Ok(())
}

fn is_first_event(&self, stream_schema: &Schema) -> bool {
let mut stream_fields = stream_schema.fields().iter();
let event_schema = self.rb.schema();
let event_fields = event_schema.fields();

for field in event_fields {
loop {
let Some(stream_field) = stream_fields.next() else { return true };
if stream_field.name() == field.name() {
break;
} else {
continue;
}
}
}

false
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event(
Expand All @@ -104,10 +85,10 @@ impl Event {
}
}

pub fn get_schema_key(fields: &Vec<Field>) -> String {
pub fn get_schema_key(fields: &[Field]) -> String {
// Fields must be sorted
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
for field in fields {
for field in fields.iter().sorted_by_key(|v| v.name()) {
hasher.update(field.name().as_bytes())
}
let hash = hasher.digest();
Expand All @@ -117,36 +98,17 @@ pub fn get_schema_key(fields: &Vec<Field>) -> String {
pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), EventError> {
let mut stream_metadata = metadata::STREAM_INFO.write().expect("lock poisoned");

let mut schema = Schema::try_merge(vec![
schema.as_ref().clone(),
stream_metadata.get_unchecked(stream_name).as_ref().clone(),
])
.unwrap();
schema.fields.sort_by(|a, b| a.name().cmp(b.name()));

stream_metadata.set_unchecked(stream_name, Arc::new(schema));
let map = &mut stream_metadata
.get_mut(stream_name)
.expect("map has entry for this stream name")
.schema;
let current_schema = Schema::new(map.values().cloned().collect());
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
map.clear();
map.extend(schema.fields.into_iter().map(|f| (f.name().clone(), f)));
Ok(())
}

trait UncheckedOp: DerefMut<Target = HashMap<String, metadata::LogStreamMetadata>> {
fn get_unchecked(&self, stream_name: &str) -> Arc<Schema> {
let schema = &self
.get(stream_name)
.expect("map has entry for this stream name")
.schema;

Arc::clone(schema)
}

fn set_unchecked(&mut self, stream_name: &str, schema: Arc<Schema>) {
self.get_mut(stream_name)
.expect("map has entry for this stream name")
.schema = schema
}
}

impl<T: DerefMut<Target = HashMap<String, metadata::LogStreamMetadata>>> UncheckedOp for T {}

pub mod error {
use arrow_schema::ArrowError;

Expand All @@ -167,57 +129,3 @@ pub mod error {
ObjectStorage(#[from] ObjectStorageError),
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};

use super::Event;

fn test_rb(fields: Vec<Field>) -> RecordBatch {
RecordBatch::new_empty(Arc::new(Schema::new(fields)))
}

fn test_event(fields: Vec<Field>) -> Event {
Event {
stream_name: "".to_string(),
rb: test_rb(fields),
origin_format: "none",
origin_size: 0,
}
}

#[test]
fn new_field_is_new_event() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
]);

let new_event = test_event(vec![
Field::new("a", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]);

assert!(new_event.is_first_event(&schema));
}

#[test]
fn same_field_not_is_new_event() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]);

let new_event = test_event(vec![
Field::new("a", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]);

assert!(!new_event.is_first_event(&schema));
}
}
117 changes: 50 additions & 67 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
*
*/

use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use anyhow::{anyhow, Error as AnyError};
use arrow_array::{RecordBatch, StringArray, TimestampMillisecondArray};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::Utc;

use crate::utils;
use crate::utils::{self, arrow::get_field};

use super::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY};

Expand All @@ -33,100 +33,83 @@ pub mod json;
type Tags = String;
type Metadata = String;

// Global Trait for event format
// This trait is implemented by all the event formats
pub trait EventFormat: Sized {
type Data;
fn to_data(self, schema: &Schema) -> Result<(Self::Data, Schema, Tags, Metadata), AnyError>;
fn to_data(
self,
schema: &HashMap<String, Field>,
) -> Result<(Self::Data, Schema, bool, Tags, Metadata), AnyError>;
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
fn into_recordbatch(self, schema: &Schema) -> Result<RecordBatch, AnyError> {
let (data, mut schema, tags, metadata) = self.to_data(schema)?;

match tags_index(&schema) {
Ok(_) => return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)),
Err(index) => {
schema
.fields
.insert(index, Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true));
}
fn into_recordbatch(
self,
schema: &HashMap<String, Field>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?;

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
};

match metadata_index(&schema) {
Ok(_) => {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_METADATA_KEY
))
}
Err(index) => {
schema.fields.insert(
index,
Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true),
);
}
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_METADATA_KEY
));
};

match timestamp_index(&schema) {
Ok(_) => {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
))
}
Err(index) => {
schema.fields.insert(
index,
Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
);
}
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
));
};

// add the p_timestamp field to the event schema to the 0th index
schema.fields.insert(
0,
Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
);

// p_tags and p_metadata are added to the end of the schema
let tags_index = schema.fields.len();
let metadata_index = tags_index + 1;
schema
.fields
.push(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true));
schema
.fields
.push(Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true));

// prepare the record batch and new fields to be added
let schema_ref = Arc::new(schema);
let rb = Self::decode(data, Arc::clone(&schema_ref))?;
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
let metadata_arr =
StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows()));
let timestamp_array = get_timestamp_array(rb.num_rows());

// modify the record batch to add fields to respective indexes
let rb = utils::arrow::replace_columns(
Arc::clone(&schema_ref),
rb,
&[
timestamp_index(&schema_ref).expect("timestamp field exists"),
tags_index(&schema_ref).expect("tags field exists"),
metadata_index(&schema_ref).expect("metadata field exists"),
],
&[0, tags_index, metadata_index],
&[
Arc::new(timestamp_array),
Arc::new(tags_arr),
Arc::new(metadata_arr),
],
);

Ok(rb)
Ok((rb, is_first))
}
}

fn tags_index(schema: &Schema) -> Result<usize, usize> {
schema
.fields
.binary_search_by_key(&DEFAULT_TAGS_KEY, |field| field.name())
}

fn metadata_index(schema: &Schema) -> Result<usize, usize> {
schema
.fields
.binary_search_by_key(&DEFAULT_METADATA_KEY, |field| field.name())
}

fn timestamp_index(schema: &Schema) -> Result<usize, usize> {
schema
.fields
.binary_search_by_key(&DEFAULT_TIMESTAMP_KEY, |field| field.name())
}

fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
let time = Utc::now();
TimestampMillisecondArray::from_value(time.timestamp_millis(), size)
Expand Down
Loading