Skip to content

feat: merge finish .arrows and convert to .parquet #1200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,15 @@ use reqwest::{Client, ClientBuilder};
// It is very unlikely that panic will occur when dealing with locks.
pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock";

pub const STORAGE_CONVERSION_INTERVAL: u64 = 60;
pub const STORAGE_UPLOAD_INTERVAL: u64 = 30;
/// Describes the duration at the end of which in-memory buffers are flushed,
/// arrows files are "finished" and compacted into parquet files.
pub const LOCAL_SYNC_INTERVAL: Duration = Duration::from_secs(60);

/// Duration used to configure prefix generation.
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60;

/// Describes the duration at the end of which parquets are pushed into objectstore.
pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);

// A single HTTP client for all outgoing HTTP requests from the parseable server
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
Expand Down
6 changes: 2 additions & 4 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,9 @@ use crate::{
metadata::{LogStreamMetadata, SchemaVersion},
metrics,
option::Mode,
storage::{
object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY,
},
storage::{object_storage::to_bytes, retention::Retention, StreamType},
utils::minute_to_slot,
LOCK_EXPECT,
LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY,
};

use super::{
Expand Down
5 changes: 2 additions & 3 deletions src/query/listing_table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ use itertools::Itertools;
use object_store::{path::Path, ObjectMeta, ObjectStore};

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY},
utils::time::TimeRange,
event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, utils::time::TimeRange,
OBJECT_STORE_DATA_GRANULARITY,
};

use super::PartialTimeFilter;
Expand Down
8 changes: 0 additions & 8 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,6 @@ pub const SCHEMA_FILE_NAME: &str = ".schema";
pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts";
pub const MANIFEST_FILE: &str = "manifest.json";

/// local sync interval to move data.records to /tmp dir of that stream.
/// 60 sec is a reasonable value.
pub const LOCAL_SYNC_INTERVAL: u64 = 60;

/// duration used to configure prefix in objectstore and local disk structure
/// used for storage. Defaults to 1 min.
pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60;

// max concurrent request allowed for datafusion object store
const MAX_OBJECT_STORE_REQUESTS: usize = 1000;

Expand Down
86 changes: 12 additions & 74 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use tracing::{error, info, trace, warn};

use crate::alerts::{alerts_utils, AlertConfig, AlertError};
use crate::parseable::PARSEABLE;
use crate::storage::LOCAL_SYNC_INTERVAL;
use crate::{STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL};
use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL};

// Calculates the instant that is the start of the next minute
fn next_minute() -> Instant {
Expand Down Expand Up @@ -76,27 +75,21 @@ where
/// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds.
#[tokio::main(flavor = "current_thread")]
pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> {
let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync();
let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync();
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
object_store_sync();
let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) =
arrow_conversion();
loop {
select! {
_ = &mut cancel_rx => {
// actix server finished .. stop other threads and stop the server
remote_sync_inbox.send(()).unwrap_or(());
localsync_inbox.send(()).unwrap_or(());
remote_conversion_inbox.send(()).unwrap_or(());
if let Err(e) = localsync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_sync_handler.await {
error!("Error joining remote_sync_handler: {:?}", e);
}
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
return Ok(());
},
_ = &mut localsync_outbox => {
Expand All @@ -111,13 +104,6 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()>
}
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync();
},
_ = &mut remote_conversion_outbox => {
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
if let Err(e) = remote_conversion_handler.await {
error!("Error joining remote_conversion_handler: {:?}", e);
}
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = arrow_conversion();
},
}
}
}
Expand All @@ -132,8 +118,7 @@ pub fn object_store_sync() -> (

let handle = task::spawn(async move {
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
let mut sync_interval =
interval_at(next_minute(), Duration::from_secs(STORAGE_UPLOAD_INTERVAL));
let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL);

let mut inbox_rx = AssertUnwindSafe(inbox_rx);

Expand Down Expand Up @@ -183,7 +168,8 @@ pub fn object_store_sync() -> (
(handle, outbox_rx, inbox_tx)
}

pub fn arrow_conversion() -> (
/// Flush arrows onto disk and convert them into parquet files
pub fn local_sync() -> (
task::JoinHandle<()>,
oneshot::Receiver<()>,
oneshot::Sender<()>,
Expand All @@ -192,17 +178,18 @@ pub fn arrow_conversion() -> (
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();

let handle = task::spawn(async move {
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
let mut sync_interval = interval_at(
next_minute() + Duration::from_secs(5), // 5 second delay
Duration::from_secs(STORAGE_CONVERSION_INTERVAL),
);
info!("Local sync task started");
let mut inbox_rx = inbox_rx;

let mut inbox_rx = AssertUnwindSafe(inbox_rx);
let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL);

loop {
select! {
_ = sync_interval.tick() => {
trace!("Flushing Arrows to disk...");
PARSEABLE.flush_all_streams();

trace!("Converting Arrow to Parquet... ");
if let Err(e) = monitor_task_duration(
"arrow_conversion",
Expand All @@ -224,55 +211,6 @@ pub fn arrow_conversion() -> (
}
}));

match result {
Ok(future) => {
future.await;
}
Err(panic_error) => {
error!("Panic in object store sync task: {panic_error:?}");
let _ = outbox_tx.send(());
}
}

info!("Object store sync task ended");
});

(handle, outbox_rx, inbox_tx)
}

pub fn run_local_sync() -> (
task::JoinHandle<()>,
oneshot::Receiver<()>,
oneshot::Sender<()>,
) {
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();

let handle = task::spawn(async move {
info!("Local sync task started");
let mut inbox_rx = inbox_rx;

let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move {
let mut sync_interval =
interval_at(next_minute(), Duration::from_secs(LOCAL_SYNC_INTERVAL));

loop {
select! {
_ = sync_interval.tick() => {
trace!("Flushing Arrows to disk...");
PARSEABLE.flush_all_streams();
},
res = &mut inbox_rx => {match res{
Ok(_) => break,
Err(_) => {
warn!("Inbox channel closed unexpectedly");
break;
}}
}
}
}
}));

match result {
Ok(future) => {
future.await;
Expand Down
Loading