Skip to content

Commit 11c512d

Browse files
authored
Cleanup in memory based ingestion (#402)
Also simplify the memory ingestion flow so it works exactly like disk ingestion flow. Fix the issue with merged reader so event ingestion doesn't cause issues. Fixes #396
1 parent ffb7047 commit 11c512d

File tree

14 files changed

+317
-1405
lines changed

14 files changed

+317
-1405
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ arrow-schema = { version = "36.0.0", features = ["serde"] }
1919
arrow-array = { version = "36.0.0" }
2020
arrow-json = "36.0.0"
2121
arrow-ipc = "36.0.0"
22+
arrow-select = "36.0.0"
2223
async-trait = "0.1"
2324
base64 = "0.21"
2425
bytes = "1.4"

server/src/alerts/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ pub mod rule;
3030
pub mod target;
3131

3232
use crate::metrics::ALERTS_STATES;
33+
use crate::utils::arrow::get_field;
3334
use crate::utils::uid;
3435
use crate::CONFIG;
3536
use crate::{storage, utils};
@@ -135,7 +136,7 @@ impl Message {
135136
// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
136137
pub fn valid(&self, schema: &Schema, column: Option<&str>) -> bool {
137138
if let Some(col) = column {
138-
return schema.field_with_name(col).is_ok();
139+
return get_field(schema, col).is_some();
139140
}
140141
true
141142
}

server/src/event.rs

Lines changed: 13 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ mod writer;
2222

2323
use arrow_array::RecordBatch;
2424
use arrow_schema::{Field, Schema};
25+
use itertools::Itertools;
2526

26-
use std::collections::HashMap;
27-
use std::ops::DerefMut;
2827
use std::sync::Arc;
2928

3029
use crate::metadata;
@@ -42,6 +41,7 @@ pub struct Event {
4241
pub rb: RecordBatch,
4342
pub origin_format: &'static str,
4443
pub origin_size: u64,
44+
pub is_first_event: bool,
4545
}
4646

4747
// Events holds the schema related to a each event for a single log stream
@@ -50,7 +50,7 @@ impl Event {
5050
let key = get_schema_key(&self.rb.schema().fields);
5151
let num_rows = self.rb.num_rows() as u64;
5252

53-
if self.is_first_event(metadata::STREAM_INFO.schema(&self.stream_name)?.as_ref()) {
53+
if self.is_first_event {
5454
commit_schema(&self.stream_name, self.rb.schema())?;
5555
}
5656

@@ -73,25 +73,6 @@ impl Event {
7373
Ok(())
7474
}
7575

76-
fn is_first_event(&self, stream_schema: &Schema) -> bool {
77-
let mut stream_fields = stream_schema.fields().iter();
78-
let event_schema = self.rb.schema();
79-
let event_fields = event_schema.fields();
80-
81-
for field in event_fields {
82-
loop {
83-
let Some(stream_field) = stream_fields.next() else { return true };
84-
if stream_field.name() == field.name() {
85-
break;
86-
} else {
87-
continue;
88-
}
89-
}
90-
}
91-
92-
false
93-
}
94-
9576
// event process all events after the 1st event. Concatenates record batches
9677
// and puts them in memory store for each event.
9778
fn process_event(
@@ -104,10 +85,10 @@ impl Event {
10485
}
10586
}
10687

107-
pub fn get_schema_key(fields: &Vec<Field>) -> String {
88+
pub fn get_schema_key(fields: &[Field]) -> String {
10889
// Fields must be sorted
10990
let mut hasher = xxhash_rust::xxh3::Xxh3::new();
110-
for field in fields {
91+
for field in fields.iter().sorted_by_key(|v| v.name()) {
11192
hasher.update(field.name().as_bytes())
11293
}
11394
let hash = hasher.digest();
@@ -117,36 +98,17 @@ pub fn get_schema_key(fields: &Vec<Field>) -> String {
11798
pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), EventError> {
11899
let mut stream_metadata = metadata::STREAM_INFO.write().expect("lock poisoned");
119100

120-
let mut schema = Schema::try_merge(vec![
121-
schema.as_ref().clone(),
122-
stream_metadata.get_unchecked(stream_name).as_ref().clone(),
123-
])
124-
.unwrap();
125-
schema.fields.sort_by(|a, b| a.name().cmp(b.name()));
126-
127-
stream_metadata.set_unchecked(stream_name, Arc::new(schema));
101+
let map = &mut stream_metadata
102+
.get_mut(stream_name)
103+
.expect("map has entry for this stream name")
104+
.schema;
105+
let current_schema = Schema::new(map.values().cloned().collect());
106+
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
107+
map.clear();
108+
map.extend(schema.fields.into_iter().map(|f| (f.name().clone(), f)));
128109
Ok(())
129110
}
130111

131-
trait UncheckedOp: DerefMut<Target = HashMap<String, metadata::LogStreamMetadata>> {
132-
fn get_unchecked(&self, stream_name: &str) -> Arc<Schema> {
133-
let schema = &self
134-
.get(stream_name)
135-
.expect("map has entry for this stream name")
136-
.schema;
137-
138-
Arc::clone(schema)
139-
}
140-
141-
fn set_unchecked(&mut self, stream_name: &str, schema: Arc<Schema>) {
142-
self.get_mut(stream_name)
143-
.expect("map has entry for this stream name")
144-
.schema = schema
145-
}
146-
}
147-
148-
impl<T: DerefMut<Target = HashMap<String, metadata::LogStreamMetadata>>> UncheckedOp for T {}
149-
150112
pub mod error {
151113
use arrow_schema::ArrowError;
152114

@@ -167,57 +129,3 @@ pub mod error {
167129
ObjectStorage(#[from] ObjectStorageError),
168130
}
169131
}
170-
171-
#[cfg(test)]
172-
mod tests {
173-
use std::sync::Arc;
174-
175-
use arrow_array::RecordBatch;
176-
use arrow_schema::{DataType, Field, Schema};
177-
178-
use super::Event;
179-
180-
fn test_rb(fields: Vec<Field>) -> RecordBatch {
181-
RecordBatch::new_empty(Arc::new(Schema::new(fields)))
182-
}
183-
184-
fn test_event(fields: Vec<Field>) -> Event {
185-
Event {
186-
stream_name: "".to_string(),
187-
rb: test_rb(fields),
188-
origin_format: "none",
189-
origin_size: 0,
190-
}
191-
}
192-
193-
#[test]
194-
fn new_field_is_new_event() {
195-
let schema = Schema::new(vec![
196-
Field::new("a", DataType::Int64, true),
197-
Field::new("b", DataType::Int64, true),
198-
]);
199-
200-
let new_event = test_event(vec![
201-
Field::new("a", DataType::Int64, true),
202-
Field::new("c", DataType::Int64, true),
203-
]);
204-
205-
assert!(new_event.is_first_event(&schema));
206-
}
207-
208-
#[test]
209-
fn same_field_not_is_new_event() {
210-
let schema = Schema::new(vec![
211-
Field::new("a", DataType::Int64, true),
212-
Field::new("b", DataType::Int64, true),
213-
Field::new("c", DataType::Int64, true),
214-
]);
215-
216-
let new_event = test_event(vec![
217-
Field::new("a", DataType::Int64, true),
218-
Field::new("c", DataType::Int64, true),
219-
]);
220-
221-
assert!(!new_event.is_first_event(&schema));
222-
}
223-
}

server/src/event/format.rs

Lines changed: 50 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
*
1818
*/
1919

20-
use std::sync::Arc;
20+
use std::{collections::HashMap, sync::Arc};
2121

2222
use anyhow::{anyhow, Error as AnyError};
2323
use arrow_array::{RecordBatch, StringArray, TimestampMillisecondArray};
2424
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2525
use chrono::Utc;
2626

27-
use crate::utils;
27+
use crate::utils::{self, arrow::get_field};
2828

2929
use super::{DEFAULT_METADATA_KEY, DEFAULT_TAGS_KEY, DEFAULT_TIMESTAMP_KEY};
3030

@@ -33,100 +33,83 @@ pub mod json;
3333
type Tags = String;
3434
type Metadata = String;
3535

36+
// Global Trait for event format
37+
// This trait is implemented by all the event formats
3638
pub trait EventFormat: Sized {
3739
type Data;
38-
fn to_data(self, schema: &Schema) -> Result<(Self::Data, Schema, Tags, Metadata), AnyError>;
40+
fn to_data(
41+
self,
42+
schema: &HashMap<String, Field>,
43+
) -> Result<(Self::Data, Schema, bool, Tags, Metadata), AnyError>;
3944
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
40-
fn into_recordbatch(self, schema: &Schema) -> Result<RecordBatch, AnyError> {
41-
let (data, mut schema, tags, metadata) = self.to_data(schema)?;
42-
43-
match tags_index(&schema) {
44-
Ok(_) => return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)),
45-
Err(index) => {
46-
schema
47-
.fields
48-
.insert(index, Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true));
49-
}
45+
fn into_recordbatch(
46+
self,
47+
schema: &HashMap<String, Field>,
48+
) -> Result<(RecordBatch, bool), AnyError> {
49+
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?;
50+
51+
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
52+
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
5053
};
5154

52-
match metadata_index(&schema) {
53-
Ok(_) => {
54-
return Err(anyhow!(
55-
"field {} is a reserved field",
56-
DEFAULT_METADATA_KEY
57-
))
58-
}
59-
Err(index) => {
60-
schema.fields.insert(
61-
index,
62-
Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true),
63-
);
64-
}
55+
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
56+
return Err(anyhow!(
57+
"field {} is a reserved field",
58+
DEFAULT_METADATA_KEY
59+
));
6560
};
6661

67-
match timestamp_index(&schema) {
68-
Ok(_) => {
69-
return Err(anyhow!(
70-
"field {} is a reserved field",
71-
DEFAULT_TIMESTAMP_KEY
72-
))
73-
}
74-
Err(index) => {
75-
schema.fields.insert(
76-
index,
77-
Field::new(
78-
DEFAULT_TIMESTAMP_KEY,
79-
DataType::Timestamp(TimeUnit::Millisecond, None),
80-
true,
81-
),
82-
);
83-
}
62+
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
63+
return Err(anyhow!(
64+
"field {} is a reserved field",
65+
DEFAULT_TIMESTAMP_KEY
66+
));
8467
};
8568

69+
// add the p_timestamp field to the event schema to the 0th index
70+
schema.fields.insert(
71+
0,
72+
Field::new(
73+
DEFAULT_TIMESTAMP_KEY,
74+
DataType::Timestamp(TimeUnit::Millisecond, None),
75+
true,
76+
),
77+
);
78+
79+
// p_tags and p_metadata are added to the end of the schema
80+
let tags_index = schema.fields.len();
81+
let metadata_index = tags_index + 1;
82+
schema
83+
.fields
84+
.push(Field::new(DEFAULT_TAGS_KEY, DataType::Utf8, true));
85+
schema
86+
.fields
87+
.push(Field::new(DEFAULT_METADATA_KEY, DataType::Utf8, true));
88+
89+
// prepare the record batch and new fields to be added
8690
let schema_ref = Arc::new(schema);
8791
let rb = Self::decode(data, Arc::clone(&schema_ref))?;
8892
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
8993
let metadata_arr =
9094
StringArray::from_iter_values(std::iter::repeat(&metadata).take(rb.num_rows()));
9195
let timestamp_array = get_timestamp_array(rb.num_rows());
9296

97+
// modify the record batch to add fields to respective indexes
9398
let rb = utils::arrow::replace_columns(
9499
Arc::clone(&schema_ref),
95100
rb,
96-
&[
97-
timestamp_index(&schema_ref).expect("timestamp field exists"),
98-
tags_index(&schema_ref).expect("tags field exists"),
99-
metadata_index(&schema_ref).expect("metadata field exists"),
100-
],
101+
&[0, tags_index, metadata_index],
101102
&[
102103
Arc::new(timestamp_array),
103104
Arc::new(tags_arr),
104105
Arc::new(metadata_arr),
105106
],
106107
);
107108

108-
Ok(rb)
109+
Ok((rb, is_first))
109110
}
110111
}
111112

112-
fn tags_index(schema: &Schema) -> Result<usize, usize> {
113-
schema
114-
.fields
115-
.binary_search_by_key(&DEFAULT_TAGS_KEY, |field| field.name())
116-
}
117-
118-
fn metadata_index(schema: &Schema) -> Result<usize, usize> {
119-
schema
120-
.fields
121-
.binary_search_by_key(&DEFAULT_METADATA_KEY, |field| field.name())
122-
}
123-
124-
fn timestamp_index(schema: &Schema) -> Result<usize, usize> {
125-
schema
126-
.fields
127-
.binary_search_by_key(&DEFAULT_TIMESTAMP_KEY, |field| field.name())
128-
}
129-
130113
fn get_timestamp_array(size: usize) -> TimestampMillisecondArray {
131114
let time = Utc::now();
132115
TimestampMillisecondArray::from_value(time.timestamp_millis(), size)

0 commit comments

Comments
 (0)