Skip to content

Commit 4d28d23

Browse files
authored
Remove in memory ingestion and query (#413)
Memory based ingestion is removed now because of instability in performance and memory usage.
1 parent 11c512d commit 4d28d23

File tree

6 files changed

+18
-460
lines changed

6 files changed

+18
-460
lines changed

server/src/event/writer.rs

Lines changed: 10 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -18,63 +18,21 @@
1818
*/
1919

2020
mod file_writer;
21-
mod mem_writer;
2221

2322
use std::{
2423
collections::HashMap,
2524
sync::{Mutex, RwLock},
2625
};
2726

28-
use crate::{
29-
option::CONFIG,
30-
storage::staging::{self, ReadBuf},
31-
};
32-
3327
use self::{errors::StreamWriterError, file_writer::FileWriter};
3428
use arrow_array::RecordBatch;
35-
use chrono::{NaiveDateTime, Utc};
3629
use derive_more::{Deref, DerefMut};
37-
use mem_writer::MemWriter;
3830
use once_cell::sync::Lazy;
3931

40-
type InMemWriter = MemWriter<8192>;
41-
4232
pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);
4333

44-
pub enum StreamWriter {
45-
Mem(InMemWriter),
46-
Disk(FileWriter, InMemWriter),
47-
}
48-
49-
impl StreamWriter {
50-
pub fn push(
51-
&mut self,
52-
stream_name: &str,
53-
schema_key: &str,
54-
rb: RecordBatch,
55-
) -> Result<(), StreamWriterError> {
56-
match self {
57-
StreamWriter::Mem(mem) => {
58-
mem.push(schema_key, rb);
59-
}
60-
StreamWriter::Disk(disk, mem) => {
61-
disk.push(stream_name, schema_key, &rb)?;
62-
mem.push(schema_key, rb);
63-
}
64-
}
65-
Ok(())
66-
}
67-
}
68-
69-
// Each entry in writer table is initialized with some context
70-
// This is helpful for generating prefix when writer is finalized
71-
pub struct WriterContext {
72-
stream_name: String,
73-
time: NaiveDateTime,
74-
}
75-
7634
#[derive(Deref, DerefMut, Default)]
77-
pub struct WriterTable(RwLock<HashMap<String, (Mutex<StreamWriter>, WriterContext)>>);
35+
pub struct WriterTable(RwLock<HashMap<String, Mutex<FileWriter>>>);
7836

7937
impl WriterTable {
8038
// append to a existing stream
@@ -87,36 +45,26 @@ impl WriterTable {
8745
let hashmap_guard = self.read().unwrap();
8846

8947
match hashmap_guard.get(stream_name) {
90-
Some((stream_writer, _)) => {
48+
Some(stream_writer) => {
9149
stream_writer
9250
.lock()
9351
.unwrap()
94-
.push(stream_name, schema_key, record)?;
52+
.push(stream_name, schema_key, &record)?;
9553
}
9654
None => {
9755
drop(hashmap_guard);
9856
let mut map = self.write().unwrap();
9957
// check for race condition
10058
// if map contains entry then just
101-
if let Some((writer, _)) = map.get(stream_name) {
59+
if let Some(writer) = map.get(stream_name) {
10260
writer
10361
.lock()
10462
.unwrap()
105-
.push(stream_name, schema_key, record)?;
63+
.push(stream_name, schema_key, &record)?;
10664
} else {
107-
// there is no entry so this can be inserted safely
108-
let context = WriterContext {
109-
stream_name: stream_name.to_owned(),
110-
time: Utc::now().naive_utc(),
111-
};
112-
let mut writer = if CONFIG.parseable.in_mem_ingestion {
113-
StreamWriter::Mem(InMemWriter::default())
114-
} else {
115-
StreamWriter::Disk(FileWriter::default(), InMemWriter::default())
116-
};
117-
118-
writer.push(stream_name, schema_key, record)?;
119-
map.insert(stream_name.to_owned(), (Mutex::new(writer), context));
65+
let mut writer = FileWriter::default();
66+
writer.push(stream_name, schema_key, &record)?;
67+
map.insert(stream_name.to_owned(), Mutex::new(writer));
12068
}
12169
}
12270
};
@@ -131,40 +79,11 @@ impl WriterTable {
13179
let mut table = self.write().unwrap();
13280
let map = std::mem::take(&mut *table);
13381
drop(table);
134-
for (writer, context) in map.into_values() {
82+
for writer in map.into_values() {
13583
let writer = writer.into_inner().unwrap();
136-
match writer {
137-
StreamWriter::Mem(mem) => {
138-
let rb = mem.finalize();
139-
let mut read_bufs = staging::MEMORY_READ_BUFFERS.write().unwrap();
140-
141-
read_bufs
142-
.entry(context.stream_name)
143-
.or_insert(Vec::default())
144-
.push(ReadBuf {
145-
time: context.time,
146-
buf: rb,
147-
});
148-
}
149-
StreamWriter::Disk(disk, _) => disk.close_all(),
150-
}
84+
writer.close_all();
15185
}
15286
}
153-
154-
pub fn clone_read_buf(&self, stream_name: &str) -> Option<ReadBuf> {
155-
let hashmap_guard = self.read().unwrap();
156-
let (writer, context) = hashmap_guard.get(stream_name)?;
157-
let writer = writer.lock().unwrap();
158-
let mem = match &*writer {
159-
StreamWriter::Mem(mem) => mem,
160-
StreamWriter::Disk(_, mem) => mem,
161-
};
162-
163-
Some(ReadBuf {
164-
time: context.time,
165-
buf: mem.recordbatch_cloned(),
166-
})
167-
}
16887
}
16988

17089
pub mod errors {

server/src/event/writer/mem_writer.rs

Lines changed: 0 additions & 118 deletions
This file was deleted.

server/src/query.rs

Lines changed: 3 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,23 @@
1616
*
1717
*/
1818

19-
pub mod table_provider;
20-
2119
use chrono::TimeZone;
2220
use chrono::{DateTime, Utc};
2321
use datafusion::arrow::datatypes::Schema;
2422
use datafusion::arrow::record_batch::RecordBatch;
25-
use datafusion::datasource::TableProvider;
2623
use datafusion::prelude::*;
2724
use itertools::Itertools;
2825
use serde_json::Value;
2926
use std::path::Path;
3027
use std::sync::Arc;
3128

32-
use crate::event::STREAM_WRITERS;
3329
use crate::option::CONFIG;
34-
use crate::storage::staging::{ReadBuf, MEMORY_READ_BUFFERS};
3530
use crate::storage::ObjectStorageError;
3631
use crate::storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY};
3732
use crate::utils::TimePeriod;
3833
use crate::validator;
3934

4035
use self::error::{ExecuteError, ParseError};
41-
use self::table_provider::QueryTableProvider;
4236

4337
type Key = &'static str;
4438
fn get_value(value: &Value, key: Key) -> Result<&str, Key> {
@@ -90,18 +84,10 @@ impl Query {
9084
);
9185

9286
let prefixes = self.get_prefixes();
93-
let table = QueryTableProvider::new(
94-
prefixes,
95-
storage,
96-
get_all_read_buf(&self.stream_name, self.start, self.end),
97-
Arc::clone(&self.schema),
98-
);
87+
let Some(table) = storage.query_table(prefixes, Arc::clone(&self.schema))? else { return Ok((Vec::new(), Vec::new())) };
9988

100-
ctx.register_table(
101-
&*self.stream_name,
102-
Arc::new(table) as Arc<dyn TableProvider>,
103-
)
104-
.map_err(ObjectStorageError::DataFusionError)?;
89+
ctx.register_table(&*self.stream_name, Arc::new(table))
90+
.map_err(ObjectStorageError::DataFusionError)?;
10591
// execute the query and collect results
10692
let df = ctx.sql(self.query.as_str()).await?;
10793
// dataframe qualifies name by adding table name before columns. \
@@ -181,30 +167,6 @@ pub mod error {
181167
}
182168
}
183169

184-
fn get_all_read_buf(stream_name: &str, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<ReadBuf> {
185-
let now = Utc::now();
186-
let include_mutable = start <= now && now <= end;
187-
// copy from mutable buffer
188-
let mut queryable_read_buffer = Vec::new();
189-
190-
if let Some(mem) = MEMORY_READ_BUFFERS.read().unwrap().get(stream_name) {
191-
for read_buffer in mem {
192-
let time = read_buffer.time;
193-
if start.naive_utc() <= time && time <= end.naive_utc() {
194-
queryable_read_buffer.push(read_buffer.clone())
195-
}
196-
}
197-
}
198-
199-
if include_mutable {
200-
if let Some(x) = STREAM_WRITERS.clone_read_buf(stream_name) {
201-
queryable_read_buffer.push(x);
202-
}
203-
}
204-
205-
queryable_read_buffer
206-
}
207-
208170
#[cfg(test)]
209171
mod tests {
210172
use super::time_from_path;

0 commit comments

Comments
 (0)