Skip to content

Commit 6f322fc

Browse files
author
Devdutt Shenoi
committed
feat: merge finish .arrows and convert to .parquet
1 parent 5e3577f commit 6f322fc

File tree

5 files changed

+25
-91
lines changed

5 files changed

+25
-91
lines changed

src/lib.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,15 @@ use reqwest::{Client, ClientBuilder};
5959
// It is very unlikely that panic will occur when dealing with locks.
6060
pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock";
6161

62-
pub const STORAGE_CONVERSION_INTERVAL: u64 = 60;
63-
pub const STORAGE_UPLOAD_INTERVAL: u64 = 30;
62+
/// Describes the duration at the end of which in-memory buffers are flushed,
63+
/// arrows files are "finished" and compacted into parquet files.
64+
pub const LOCAL_SYNC_INTERVAL: Duration = Duration::from_secs(60);
65+
66+
/// Duration used to configure prefix generation.
67+
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60;
68+
69+
/// Describes the duration at the end of which parquets are pushed into objectstore.
70+
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(60);
6471

6572
// A single HTTP client for all outgoing HTTP requests from the parseable server
6673
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {

src/parseable/streams.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,9 @@ use crate::{
4949
metadata::{LogStreamMetadata, SchemaVersion},
5050
metrics,
5151
option::Mode,
52-
storage::{
53-
object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY,
54-
},
52+
storage::{object_storage::to_bytes, retention::Retention, StreamType},
5553
utils::minute_to_slot,
56-
LOCK_EXPECT,
54+
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
5755
};
5856

5957
use super::{

src/query/listing_table_builder.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ use itertools::Itertools;
3232
use object_store::{path::Path, ObjectMeta, ObjectStore};
3333

3434
use crate::{
35-
event::DEFAULT_TIMESTAMP_KEY,
36-
storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY},
37-
utils::time::TimeRange,
35+
event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, utils::time::TimeRange,
36+
OBJECT_STORE_DATA_GRANULARITY,
3837
};
3938

4039
use super::PartialTimeFilter;

src/storage/mod.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,6 @@ pub const SCHEMA_FILE_NAME: &str = ".schema";
5757
pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts";
5858
pub const MANIFEST_FILE: &str = "manifest.json";
5959

60-
/// local sync interval to move data.records to /tmp dir of that stream.
61-
/// 60 sec is a reasonable value.
62-
pub const LOCAL_SYNC_INTERVAL: u64 = 60;
63-
64-
/// duration used to configure prefix in objectstore and local disk structure
65-
/// used for storage. Defaults to 1 min.
66-
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60;
67-
6860
// max concurrent request allowed for datafusion object store
6961
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;
7062

src/sync.rs

Lines changed: 12 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ use tracing::{error, info, trace, warn};
2626

2727
use crate::alerts::{alerts_utils, AlertConfig, AlertError};
2828
use crate::parseable::PARSEABLE;
29-
use crate::storage::LOCAL_SYNC_INTERVAL;
30-
use crate::{STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL};
29+
use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL};
3130

3231
// Calculates the instant that is the start of the next minute
3332
fn next_minute() -> Instant {
@@ -76,27 +75,21 @@ where
7675
/// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds.
7776
#[tokio::main(flavor = "current_thread")]
7877
pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> {
79-
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
78+
let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync();
8079
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
8180
object_store_sync();
82-
let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) =
83-
arrow_conversion();
8481
loop {
8582
select! {
8683
_ = &mut cancel_rx => {
8784
// actix server finished .. stop other threads and stop the server
8885
remote_sync_inbox.send(()).unwrap_or(());
8986
localsync_inbox.send(()).unwrap_or(());
90-
remote_conversion_inbox.send(()).unwrap_or(());
9187
if let Err(e) = localsync_handler.await {
9288
error!("Error joining remote_sync_handler: {:?}", e);
9389
}
9490
if let Err(e) = remote_sync_handler.await {
9591
error!("Error joining remote_sync_handler: {:?}", e);
9692
}
97-
if let Err(e) = remote_conversion_handler.await {
98-
error!("Error joining remote_conversion_handler: {:?}", e);
99-
}
10093
return Ok(());
10194
},
10295
_ = &mut localsync_outbox => {
@@ -111,13 +104,6 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()>
111104
}
112105
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync();
113106
},
114-
_ = &mut remote_conversion_outbox => {
115-
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
116-
if let Err(e) = remote_conversion_handler.await {
117-
error!("Error joining remote_conversion_handler: {:?}", e);
118-
}
119-
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = arrow_conversion();
120-
},
121107
}
122108
}
123109
}
@@ -132,8 +118,7 @@ pub fn object_store_sync() -> (
132118

133119
let handle = task::spawn(async move {
134120
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
135-
let mut sync_interval =
136-
interval_at(next_minute(), Duration::from_secs(STORAGE_UPLOAD_INTERVAL));
121+
let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL);
137122

138123
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
139124

@@ -183,7 +168,8 @@ pub fn object_store_sync() -> (
183168
(handle, outbox_rx, inbox_tx)
184169
}
185170

186-
pub fn arrow_conversion() -> (
171+
/// Flush arrows onto disk and convert them into parquet files
172+
pub fn local_sync() -> (
187173
task::JoinHandle<()>,
188174
oneshot::Receiver<()>,
189175
oneshot::Sender<()>,
@@ -192,17 +178,18 @@ pub fn arrow_conversion() -> (
192178
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
193179

194180
let handle = task::spawn(async move {
195-
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
196-
let mut sync_interval = interval_at(
197-
next_minute() + Duration::from_secs(5), // 5 second delay
198-
Duration::from_secs(STORAGE_CONVERSION_INTERVAL),
199-
);
181+
info!("Local sync task started");
182+
let mut inbox_rx = inbox_rx;
200183

201-
let mut inbox_rx = AssertUnwindSafe(inbox_rx);
184+
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
185+
let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL);
202186

203187
loop {
204188
select! {
205189
_ = sync_interval.tick() => {
190+
trace!("Flushing Arrows to disk...");
191+
PARSEABLE.flush_all_streams();
192+
206193
trace!("Converting Arrow to Parquet... ");
207194
if let Err(e) = monitor_task_duration(
208195
"arrow_conversion",
@@ -224,55 +211,6 @@ pub fn arrow_conversion() -> (
224211
}
225212
}));
226213

227-
match result {
228-
Ok(future) => {
229-
future.await;
230-
}
231-
Err(panic_error) => {
232-
error!("Panic in object store sync task: {panic_error:?}");
233-
let _ = outbox_tx.send(());
234-
}
235-
}
236-
237-
info!("Object store sync task ended");
238-
});
239-
240-
(handle, outbox_rx, inbox_tx)
241-
}
242-
243-
pub fn run_local_sync() -> (
244-
task::JoinHandle<()>,
245-
oneshot::Receiver<()>,
246-
oneshot::Sender<()>,
247-
) {
248-
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
249-
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
250-
251-
let handle = task::spawn(async move {
252-
info!("Local sync task started");
253-
let mut inbox_rx = inbox_rx;
254-
255-
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
256-
let mut sync_interval =
257-
interval_at(next_minute(), Duration::from_secs(LOCAL_SYNC_INTERVAL));
258-
259-
loop {
260-
select! {
261-
_ = sync_interval.tick() => {
262-
trace!("Flushing Arrows to disk...");
263-
PARSEABLE.flush_all_streams();
264-
},
265-
res = &mut inbox_rx => {match res{
266-
Ok(_) => break,
267-
Err(_) => {
268-
warn!("Inbox channel closed unexpectedly");
269-
break;
270-
}}
271-
}
272-
}
273-
}
274-
}));
275-
276214
match result {
277215
Ok(future) => {
278216
future.await;

0 commit comments

Comments
 (0)