Skip to content

Commit 418db72

Browse files
authored
feat: add reverse arrow reader (#548)
This PR adds a new arrow reader which can read arrow file in reverse. This is to be used while writing to parquet file such that it yields file sorted in timestamp descending. Descending timestamp is the natural format for log data where users are interested in looking at the latest events first. This change helps the client tools (console / pb / others) in the sense that they don't have to sort this on client side, neither they need to issue queries with `order by` keywords. This PR also add extra time interval to object sync (5s) to ensure there is no race with the local_sync function.
1 parent 442ca48 commit 418db72

File tree

9 files changed

+371
-22
lines changed

9 files changed

+371
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ argon2 = "0.5.0"
3838
async-trait = "0.1"
3939
base64 = "0.21"
4040
bytes = "1.4"
41+
byteorder = "1.4.3"
4142
bzip2 = { version = "*", features = ["static"] }
43+
cookie = "0.17.0"
4244
chrono = "0.4"
4345
chrono-humanize = "0.2"
4446
clap = { version = "4.1", default-features = false, features = [
@@ -97,8 +99,6 @@ humantime = "2.1.0"
9799
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
98100
url = "2.4.0"
99101
http-auth-basic = "0.3.3"
100-
cookies = "0.0.1"
101-
cookie = "0.17.0"
102102

103103
[build-dependencies]
104104
cargo_toml = "0.15"
@@ -114,8 +114,8 @@ maplit = "1.0"
114114
rstest = "0.16"
115115

116116
[package.metadata.parseable_ui]
117-
assets-url = "https://github.com/parseablehq/console/releases/download/v0.3.1/build.zip"
118-
assets-sha1 = "6abd7b5ca5b9c832ff58b8450cffdc83dd7172bf"
117+
assets-url = "https://github.com/parseablehq/console/releases/download/v0.3.3/build.zip"
118+
assets-sha1 = "29e1eaa2dfa081d495c1504f05b78914d074990c"
119119

120120
[features]
121121
debug = []

server/src/main.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*
1717
*/
1818

19-
use clokwerk::{AsyncScheduler, Scheduler, TimeUnits};
19+
use clokwerk::{AsyncScheduler, Job, Scheduler, TimeUnits};
2020
use thread_priority::{ThreadBuilder, ThreadPriority};
2121
use tokio::sync::oneshot;
2222
use tokio::sync::oneshot::error::TryRecvError;
@@ -122,6 +122,8 @@ fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sende
122122
let mut scheduler = AsyncScheduler::new();
123123
scheduler
124124
.every((CONFIG.parseable.upload_interval as u32).seconds())
125+
// Extra time interval is added so that this schedular does not race with local sync.
126+
.plus(5u32.seconds())
125127
.run(|| async {
126128
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
127129
log::warn!("failed to sync local data with object store. {:?}", e);

server/src/query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ impl Query {
171171
return Ok(None);
172172
}
173173
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
174-
let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]];
174+
let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(false, true)]];
175175
let listing_options = ListingOptions::new(Arc::new(file_format))
176176
.with_file_extension(".parquet")
177177
.with_file_sort_order(file_sort_order)

server/src/query/table_provider.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ use std::any::Any;
3434
use std::sync::Arc;
3535
use std::vec;
3636

37+
use crate::utils::arrow::reverse_reader::reverse;
38+
3739
pub struct QueryTableProvider {
3840
staging: Option<MemTable>,
3941
// remote table
@@ -47,6 +49,17 @@ impl QueryTableProvider {
4749
storage: Option<Arc<ListingTable>>,
4850
schema: Arc<Schema>,
4951
) -> Result<Self, DataFusionError> {
52+
// in place reverse transform
53+
let staging = if let Some(mut staged_batches) = staging {
54+
staged_batches[..].reverse();
55+
staged_batches
56+
.iter_mut()
57+
.for_each(|batch| *batch = reverse(batch));
58+
Some(staged_batches)
59+
} else {
60+
None
61+
};
62+
5063
let memtable = staging
5164
.map(|records| MemTable::try_new(schema.clone(), vec![records]))
5265
.transpose()?;

server/src/storage/staging.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::{
4141
metrics,
4242
option::CONFIG,
4343
storage::OBJECT_STORE_DATA_GRANULARITY,
44-
utils::{self, arrow::MergedRecordReader},
44+
utils::{self, arrow::merged_reader::MergedReverseRecordReader},
4545
};
4646

4747
const ARROW_FILE_EXTENSION: &str = "data.arrows";
@@ -198,7 +198,7 @@ pub fn convert_disk_files_to_parquet(
198198
.add(file_size as i64);
199199
}
200200

201-
let record_reader = MergedRecordReader::try_new(&files).unwrap();
201+
let record_reader = MergedReverseRecordReader::try_new(&files).unwrap();
202202

203203
let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?;
204204

@@ -239,8 +239,8 @@ fn parquet_writer_props() -> WriterPropertiesBuilder {
239239
)
240240
.set_sorting_columns(Some(vec![SortingColumn {
241241
column_idx: 0,
242-
descending: false,
243-
nulls_first: false,
242+
descending: true,
243+
nulls_first: true,
244244
}]))
245245
}
246246

server/src/utils/arrow.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use itertools::Itertools;
2525

2626
pub mod batch_adapter;
2727
pub mod merged_reader;
28+
pub mod reverse_reader;
2829

2930
pub use batch_adapter::adapt_batch;
3031
pub use merged_reader::MergedRecordReader;

server/src/utils/arrow/merged_reader.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ use arrow_ipc::reader::StreamReader;
2424
use arrow_schema::Schema;
2525
use itertools::kmerge_by;
2626

27-
use super::adapt_batch;
28-
use crate::event::DEFAULT_TIMESTAMP_KEY;
27+
use super::{
28+
adapt_batch,
29+
reverse_reader::{reverse, OffsetReader},
30+
};
31+
use crate::{event::DEFAULT_TIMESTAMP_KEY, utils};
2932

3033
#[derive(Debug)]
3134
pub struct MergedRecordReader {
@@ -44,14 +47,42 @@ impl MergedRecordReader {
4447
Ok(Self { readers })
4548
}
4649

47-
pub fn merged_iter(self, schema: Arc<Schema>) -> impl Iterator<Item = RecordBatch> {
48-
let adapted_readers = self.readers.into_iter().map(move |reader| reader.flatten());
50+
pub fn merged_schema(&self) -> Schema {
51+
Schema::try_merge(
52+
self.readers
53+
.iter()
54+
.map(|reader| reader.schema().as_ref().clone()),
55+
)
56+
.unwrap()
57+
}
58+
}
4959

60+
#[derive(Debug)]
61+
pub struct MergedReverseRecordReader {
62+
pub readers: Vec<StreamReader<BufReader<OffsetReader<File>>>>,
63+
}
64+
65+
impl MergedReverseRecordReader {
66+
pub fn try_new(files: &[PathBuf]) -> Result<Self, ()> {
67+
let mut readers = Vec::with_capacity(files.len());
68+
for file in files {
69+
let reader =
70+
utils::arrow::reverse_reader::get_reverse_reader(File::open(file).unwrap())
71+
.map_err(|_| ())?;
72+
readers.push(reader);
73+
}
74+
75+
Ok(Self { readers })
76+
}
77+
78+
pub fn merged_iter(self, schema: Arc<Schema>) -> impl Iterator<Item = RecordBatch> {
79+
let adapted_readers = self.readers.into_iter().map(|reader| reader.flatten());
5080
kmerge_by(adapted_readers, |a: &RecordBatch, b: &RecordBatch| {
5181
let a_time = get_timestamp_millis(a);
5282
let b_time = get_timestamp_millis(b);
53-
a_time < b_time
83+
a_time > b_time
5484
})
85+
.map(|batch| reverse(&batch))
5586
.map(move |batch| adapt_batch(&schema, &batch))
5687
}
5788

0 commit comments

Comments
 (0)