diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index bd79d1019..7bb1fed97 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -27,7 +27,8 @@ use actix_web::{ HttpResponse, }; use http::StatusCode; -use tokio::sync::Mutex; +use tokio::{sync::Mutex, task::JoinSet}; +use tracing::{error, info, warn}; use crate::parseable::PARSEABLE; @@ -60,8 +61,29 @@ pub async fn shutdown() { let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; *shutdown_flag = true; + let mut joinset = JoinSet::new(); + // Sync staging - PARSEABLE.flush_all_streams(); + PARSEABLE.streams.flush_and_convert(&mut joinset, true); + + while let Some(res) = joinset.join_next().await { + match res { + Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), + Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), + Err(err) => error!("Failed to join async task: {err}"), + } + } + + if let Err(e) = PARSEABLE + .storage + .get_object_store() + .upload_files_from_staging() + .await + { + warn!("Failed to sync local data with object store. {:?}", e); + } else { + info!("Successfully synced all data to S3."); + } } pub async fn readiness() -> HttpResponse { diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index c0e7278b9..e9aae3e6a 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -136,26 +136,6 @@ pub trait ParseableServer { health_check::shutdown().await; - // Perform S3 sync and wait for completion - info!("Starting data sync to S3..."); - - if let Err(e) = PARSEABLE.streams.prepare_parquet(true) { - warn!("Failed to convert arrow files to parquet. {:?}", e); - } else { - info!("Successfully converted arrow files to parquet."); - } - - if let Err(e) = PARSEABLE - .storage - .get_object_store() - .upload_files_from_staging() - .await - { - warn!("Failed to sync local data with object store. {:?}", e); - } else { - info!("Successfully synced all data to S3."); - } - // Initiate graceful shutdown info!("Graceful shutdown of HTTP server triggered"); srv_handle.stop(true).await; diff --git a/src/lib.rs b/src/lib.rs index 91fa0a405..2f8eb06ad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,8 +59,15 @@ use reqwest::{Client, ClientBuilder}; // It is very unlikely that panic will occur when dealing with locks. pub const LOCK_EXPECT: &str = "Thread shouldn't panic while holding a lock"; -pub const STORAGE_CONVERSION_INTERVAL: u64 = 60; -pub const STORAGE_UPLOAD_INTERVAL: u64 = 30; +/// Describes the duration at the end of which in-memory buffers are flushed, +/// arrows files are "finished" and compacted into parquet files. +pub const LOCAL_SYNC_INTERVAL: Duration = Duration::from_secs(60); + +/// Duration used to configure prefix generation. +pub const OBJECT_STORE_DATA_GRANULARITY: u32 = LOCAL_SYNC_INTERVAL.as_secs() as u32 / 60; + +/// Describes the duration at the end of which parquets are pushed into objectstore. +pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30); // A single HTTP client for all outgoing HTTP requests from the parseable server static HTTP_CLIENT: Lazy = Lazy::new(|| { diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 8e485c589..eb4dd6761 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -179,16 +179,6 @@ impl Parseable { .unwrap_or_default()) } - /// Writes all streams in staging onto disk, awaiting conversion into parquet. - /// Deletes all in memory recordbatches, freeing up rows in mem-writer. - pub fn flush_all_streams(&self) { - let streams = self.streams.read().unwrap(); - - for staging in streams.values() { - staging.flush() - } - } - // validate the storage, if the proper path for staging directory is provided // if the proper data directory is provided, or s3 bucket is provided etc pub async fn validate_storage(&self) -> Result, ObjectStorageError> { diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7d8decdca..e0e143c34 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -41,6 +41,7 @@ use parquet::{ }; use rand::distributions::DistString; use relative_path::RelativePathBuf; +use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; use crate::{ @@ -49,11 +50,9 @@ use crate::{ metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, - storage::{ - object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY, - }, + storage::{object_storage::to_bytes, retention::Retention, StreamType}, utils::minute_to_slot, - LOCK_EXPECT, + LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, }; use super::{ @@ -653,6 +652,13 @@ impl Stream { pub fn get_stream_type(&self) -> StreamType { self.metadata.read().expect(LOCK_EXPECT).stream_type } + + /// First flushes arrows onto disk and then converts the arrow into parquet + pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> { + self.flush(); + + self.prepare_parquet(shutdown_signal) + } } #[derive(Deref, DerefMut, Default)] @@ -719,8 +725,13 @@ impl Streams { .collect() } - /// Convert arrow files into parquet, preparing it for upload - pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { + /// Asynchronously flushes arrows and compacts into parquet data on all streams in staging, + /// so that it is ready to be pushed onto objectstore. + pub fn flush_and_convert( + &self, + joinset: &mut JoinSet>, + shutdown_signal: bool, + ) { let streams: Vec> = self .read() .expect(LOCK_EXPECT) @@ -728,12 +739,8 @@ impl Streams { .map(Arc::clone) .collect(); for stream in streams { - stream - .prepare_parquet(shutdown_signal) - .inspect_err(|err| error!("Failed to run conversion task {err:?}"))?; + joinset.spawn(async move { stream.flush_and_convert(shutdown_signal) }); } - - Ok(()) } } diff --git a/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs index 774bc9acf..35cf00cbc 100644 --- a/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -32,9 +32,8 @@ use itertools::Itertools; use object_store::{path::Path, ObjectMeta, ObjectStore}; use crate::{ - event::DEFAULT_TIMESTAMP_KEY, - storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY}, - utils::time::TimeRange, + event::DEFAULT_TIMESTAMP_KEY, storage::ObjectStorage, utils::time::TimeRange, + OBJECT_STORE_DATA_GRANULARITY, }; use super::PartialTimeFilter; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3be5bfc37..fb1487fcd 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -57,14 +57,6 @@ pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERTS_ROOT_DIRECTORY: &str = ".alerts"; pub const MANIFEST_FILE: &str = "manifest.json"; -/// local sync interval to move data.records to /tmp dir of that stream. -/// 60 sec is a reasonable value. -pub const LOCAL_SYNC_INTERVAL: u64 = 60; - -/// duration used to configure prefix in objectstore and local disk structure -/// used for storage. Defaults to 1 min. -pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60; - // max concurrent request allowed for datafusion object store const MAX_OBJECT_STORE_REQUESTS: usize = 1000; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index a6ba5558e..24ed3222d 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -712,7 +712,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> { - if !Path::new(&PARSEABLE.options.staging_dir()).exists() { + if !PARSEABLE.options.staging_dir().exists() { return Ok(()); } diff --git a/src/sync.rs b/src/sync.rs index a05b4ca72..86b489893 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -20,14 +20,14 @@ use chrono::{TimeDelta, Timelike}; use std::future::Future; use std::panic::AssertUnwindSafe; use tokio::sync::oneshot; +use tokio::task::JoinSet; use tokio::time::{interval_at, sleep, Duration, Instant}; use tokio::{select, task}; use tracing::{error, info, trace, warn}; use crate::alerts::{alerts_utils, AlertConfig, AlertError}; use crate::parseable::PARSEABLE; -use crate::storage::LOCAL_SYNC_INTERVAL; -use crate::{STORAGE_CONVERSION_INTERVAL, STORAGE_UPLOAD_INTERVAL}; +use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; // Calculates the instant that is the start of the next minute fn next_minute() -> Instant { @@ -63,7 +63,7 @@ where if warned_once { warn!( "Task '{task_name}' took longer than expected: {:?} (threshold: {threshold:?})", - start_time.elapsed() - threshold + start_time.elapsed() ); } break res.expect("Task handle shouldn't error"); @@ -74,28 +74,22 @@ where /// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every /// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds. -#[tokio::main(flavor = "current_thread")] +#[tokio::main(flavor = "multi_thread", worker_threads = 2)] pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> { - let (localsync_handler, mut localsync_outbox, localsync_inbox) = run_local_sync(); + let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = object_store_sync(); - let (mut remote_conversion_handler, mut remote_conversion_outbox, mut remote_conversion_inbox) = - arrow_conversion(); loop { select! { _ = &mut cancel_rx => { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); - remote_conversion_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining localsync_handler: {e:?}"); } if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); - } - if let Err(e) = remote_conversion_handler.await { - error!("Error joining remote_conversion_handler: {:?}", e); + error!("Error joining remote_sync_handler: {e:?}"); } return Ok(()); }, @@ -107,17 +101,10 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> _ = &mut remote_sync_outbox => { // remote_sync failed, this is recoverable by just starting remote_sync thread again if let Err(e) = remote_sync_handler.await { - error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {e:?}"); } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = object_store_sync(); }, - _ = &mut remote_conversion_outbox => { - // remote_conversion failed, this is recoverable by just starting remote_conversion thread again - if let Err(e) = remote_conversion_handler.await { - error!("Error joining remote_conversion_handler: {:?}", e); - } - (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = arrow_conversion(); - }, } } } @@ -132,8 +119,7 @@ pub fn object_store_sync() -> ( let handle = task::spawn(async move { let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = - interval_at(next_minute(), Duration::from_secs(STORAGE_UPLOAD_INTERVAL)); + let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL); let mut inbox_rx = AssertUnwindSafe(inbox_rx); @@ -183,64 +169,8 @@ pub fn object_store_sync() -> ( (handle, outbox_rx, inbox_tx) } -pub fn arrow_conversion() -> ( - task::JoinHandle<()>, - oneshot::Receiver<()>, - oneshot::Sender<()>, -) { - let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); - let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); - - let handle = task::spawn(async move { - let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = interval_at( - next_minute() + Duration::from_secs(5), // 5 second delay - Duration::from_secs(STORAGE_CONVERSION_INTERVAL), - ); - - let mut inbox_rx = AssertUnwindSafe(inbox_rx); - - loop { - select! { - _ = sync_interval.tick() => { - trace!("Converting Arrow to Parquet... "); - if let Err(e) = monitor_task_duration( - "arrow_conversion", - Duration::from_secs(30), - || async { PARSEABLE.streams.prepare_parquet(false) }, - ).await - { - warn!("failed to convert local arrow data to parquet. {e:?}"); - } - }, - res = &mut inbox_rx => {match res{ - Ok(_) => break, - Err(_) => { - warn!("Inbox channel closed unexpectedly"); - break; - }} - } - } - } - })); - - match result { - Ok(future) => { - future.await; - } - Err(panic_error) => { - error!("Panic in object store sync task: {panic_error:?}"); - let _ = outbox_tx.send(()); - } - } - - info!("Object store sync task ended"); - }); - - (handle, outbox_rx, inbox_tx) -} - -pub fn run_local_sync() -> ( +/// Flush arrows onto disk and convert them into parquet files +pub fn local_sync() -> ( task::JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>, @@ -253,15 +183,23 @@ pub fn run_local_sync() -> ( let mut inbox_rx = inbox_rx; let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { - let mut sync_interval = - interval_at(next_minute(), Duration::from_secs(LOCAL_SYNC_INTERVAL)); + let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL); + let mut joinset = JoinSet::new(); loop { select! { + // Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds _ = sync_interval.tick() => { - trace!("Flushing Arrows to disk..."); - PARSEABLE.flush_all_streams(); + PARSEABLE.streams.flush_and_convert(&mut joinset, false) }, + // Joins and logs errors in spawned tasks + Some(res) = joinset.join_next(), if !joinset.is_empty() => { + match res { + Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), + Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), + Err(err) => error!("Issue joining flush+conversion task: {err}"), + } + } res = &mut inbox_rx => {match res{ Ok(_) => break, Err(_) => { @@ -278,7 +216,7 @@ pub fn run_local_sync() -> ( future.await; } Err(panic_error) => { - error!("Panic in local sync task: {:?}", panic_error); + error!("Panic in local sync task: {panic_error:?}"); } }