Skip to content

Fix p_timestamp to index 0 #399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 13 additions & 105 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ mod writer;

use arrow_array::RecordBatch;
use arrow_schema::{Field, Schema};
use itertools::Itertools;

use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::Arc;

use crate::metadata;
Expand All @@ -42,6 +41,7 @@ pub struct Event {
pub rb: RecordBatch,
pub origin_format: &'static str,
pub origin_size: u64,
pub is_first_event: bool,
}

// Events holds the schema related to a each event for a single log stream
Expand All @@ -50,7 +50,7 @@ impl Event {
let key = get_schema_key(&self.rb.schema().fields);
let num_rows = self.rb.num_rows() as u64;

if self.is_first_event(metadata::STREAM_INFO.schema(&self.stream_name)?.as_ref()) {
if self.is_first_event {
commit_schema(&self.stream_name, self.rb.schema())?;
}

Expand All @@ -73,25 +73,6 @@ impl Event {
Ok(())
}

fn is_first_event(&self, stream_schema: &Schema) -> bool {
let mut stream_fields = stream_schema.fields().iter();
let event_schema = self.rb.schema();
let event_fields = event_schema.fields();

for field in event_fields {
loop {
let Some(stream_field) = stream_fields.next() else { return true };
if stream_field.name() == field.name() {
break;
} else {
continue;
}
}
}

false
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event(
Expand All @@ -104,10 +85,10 @@ impl Event {
}
}

pub fn get_schema_key(fields: &Vec<Field>) -> String {
pub fn get_schema_key(fields: &[Field]) -> String {
// Fields must be sorted
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
for field in fields {
for field in fields.iter().sorted_by_key(|v| v.name()) {
hasher.update(field.name().as_bytes())
}
let hash = hasher.digest();
Expand All @@ -117,36 +98,17 @@ pub fn get_schema_key(fields: &Vec<Field>) -> String {
pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), EventError> {
let mut stream_metadata = metadata::STREAM_INFO.write().expect("lock poisoned");

let mut schema = Schema::try_merge(vec![
schema.as_ref().clone(),
stream_metadata.get_unchecked(stream_name).as_ref().clone(),
])
.unwrap();
schema.fields.sort_by(|a, b| a.name().cmp(b.name()));

stream_metadata.set_unchecked(stream_name, Arc::new(schema));
let map = &mut stream_metadata
.get_mut(stream_name)
.expect("map has entry for this stream name")
.schema;
let current_schema = Schema::new(map.values().cloned().collect());
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
map.clear();
map.extend(schema.fields.into_iter().map(|f| (f.name().clone(), f)));
Ok(())
}

trait UncheckedOp: DerefMut<Target = HashMap<String, metadata::LogStreamMetadata>> {
fn get_unchecked(&self, stream_name: &str) -> Arc<Schema> {
let schema = &self
.get(stream_name)
.expect("map has entry for this stream name")
.schema;

Arc::clone(schema)
}

fn set_unchecked(&mut self, stream_name: &str, schema: Arc<Schema>) {
self.get_mut(stream_name)
.expect("map has entry for this stream name")
.schema = schema
}
}

impl<T: DerefMut<Target = HashMap<String, metadata::LogStreamMetadata>>> UncheckedOp for T {}

pub mod error {
use arrow_schema::ArrowError;

Expand All @@ -167,57 +129,3 @@ pub mod error {
ObjectStorage(#[from] ObjectStorageError),
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};

use super::Event;

fn test_rb(fields: Vec<Field>) -> RecordBatch {
RecordBatch::new_empty(Arc::new(Schema::new(fields)))
}

fn test_event(fields: Vec<Field>) -> Event {
Event {
stream_name: "".to_string(),
rb: test_rb(fields),
origin_format: "none",
origin_size: 0,
}
}

#[test]
fn new_field_is_new_event() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
]);

let new_event = test_event(vec![
Field::new("a", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]);

assert!(new_event.is_first_event(&schema));
}

#[test]
fn same_field_not_is_new_event() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]);

let new_event = test_event(vec![
Field::new("a", DataType::Int64, true),
Field::new("c", DataType::Int64, true),
]);

assert!(!new_event.is_first_event(&schema));
}
}
110 changes: 44 additions & 66 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*
*/

use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use anyhow::{anyhow, Error as AnyError};
use arrow_array::{RecordBatch, StringArray, TimestampMillisecondArray};
Expand All @@ -35,54 +35,54 @@ type Metadata = String;

pub trait EventFormat: Sized {
type Data;
fn to_data(self, schema: &Schema) -> Result<(Self::Data, Schema, Tags, Metadata), AnyError>;
fn to_data(
self,
schema: &HashMap<String, Field>,
) -> Result<(Self::Data, Schema, bool, Tags, Metadata), AnyError>;
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
fn into_recordbatch(self, schema: &Schema) -> Result<RecordBatch, AnyError> {
let (data, mut schema, tags, metadata) = self.to_data(schema)?;

match tags_index(&schema) {
Ok(_) => return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)),
Err(index) => {
schema
.fields
.insert(index, Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true));
}
fn into_recordbatch(
self,
schema: &HashMap<String, Field>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?;

if schema.field_with_name(DEFAULT_TAGS_KEY).is_ok() {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
};

match metadata_index(&schema) {
Ok(_) => {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_METADATA_KEY
))
}
Err(index) => {
schema.fields.insert(
index,
Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true),
);
}
if schema.field_with_name(DEFAULT_TAGS_KEY).is_ok() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_METADATA_KEY
));
};

match timestamp_index(&schema) {
Ok(_) => {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
))
}
Err(index) => {
schema.fields.insert(
index,
Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
);
}
if schema.field_with_name(DEFAULT_TAGS_KEY).is_ok() {
return Err(anyhow!(
"field {} is a reserved field",
DEFAULT_TIMESTAMP_KEY
));
};

schema.fields.insert(
0,
Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
);
let tags_index = schema.fields.len();

schema
.fields
.push(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true));

let metadata_index = schema.fields.len();
schema
.fields
.push(Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true));

let schema_ref = Arc::new(schema);
let rb = Self::decode(data, Arc::clone(&schema_ref))?;
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
Expand All @@ -93,40 +93,18 @@ pub trait EventFormat: Sized {
let rb = utils::arrow::replace_columns(
Arc::clone(&schema_ref),
rb,
&[
timestamp_index(&schema_ref).expect("timestamp field exists"),
tags_index(&schema_ref).expect("tags field exists"),
metadata_index(&schema_ref).expect("metadata field exists"),
],
&[0, tags_index, metadata_index],
&[
Arc::new(timestamp_array),
Arc::new(tags_arr),
Arc::new(metadata_arr),
],
);

Ok(rb)
Ok((rb, is_first))
}
}

fn tags_index(schema: &Schema) -> Result<usize, usize> {
schema
.fields
.binary_search_by_key(&DEFAULT_TAGS_KEY, |field| field.name())
}

fn metadata_index(schema: &Schema) -> Result<usize, usize> {
schema
.fields
.binary_search_by_key(&DEFAULT_METADATA_KEY, |field| field.name())
}

fn timestamp_index(schema: &Schema) -> Result<usize, usize> {
schema
.fields
.binary_search_by_key(&DEFAULT_TIMESTAMP_KEY, |field| field.name())
}

fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
let time = Utc::now();
TimestampMillisecondArray::from_value(time.timestamp_millis(), size)
Expand Down
Loading