diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index bb71b0983..68747c0b2 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -339,9 +339,10 @@ impl IngestServer { migration::run_migration(&CONFIG).await?; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync(); + sync::object_store_sync().await; tokio::spawn(airplane::server()); @@ -354,8 +355,12 @@ impl IngestServer { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); - localsync_handler.join().unwrap_or(()); - remote_sync_handler.join().unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } return e }, _ = &mut localsync_outbox => { @@ -365,8 +370,10 @@ impl IngestServer { }, _ = &mut remote_sync_outbox => { // remote_sync failed, this is recoverable by just starting remote_sync thread again - remote_sync_handler.join().unwrap_or(()); - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; } }; diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 166524c21..73a44faf1 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -105,8 +105,8 @@ impl ParseableServer for QueryServer { fn validate(&self) -> anyhow::Result<()> { if CONFIG.get_storage_mode_string() == "Local drive" { return Err(anyhow::anyhow!( - "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", - )); + "Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.", + )); } Ok(()) @@ -188,13 +188,13 @@ impl QueryServer { if matches!(init_cluster_metrics_schedular(), Ok(())) { log::info!("Cluster metrics scheduler started successfully"); } - if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.download_from_s3()?; }; - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync(); + sync::object_store_sync().await; tokio::spawn(airplane::server()); let app = self.start(prometheus, CONFIG.parseable.openid.clone()); @@ -206,8 +206,12 @@ impl QueryServer { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); - localsync_handler.join().unwrap_or(()); - remote_sync_handler.join().unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining localsync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } return e }, _ = &mut localsync_outbox => { @@ -217,8 +221,10 @@ impl QueryServer { }, _ = &mut remote_sync_outbox => { // remote_sync failed, this is recoverable by just starting remote_sync thread again - remote_sync_handler.join().unwrap_or(()); - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; } }; diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index 213d279a4..e17080e31 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -533,9 +533,10 @@ impl Server { storage::retention::load_retention_from_global(); - let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync(); + let (localsync_handler, mut localsync_outbox, localsync_inbox) = + sync::run_local_sync().await; let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = - sync::object_store_sync(); + sync::object_store_sync().await; if CONFIG.parseable.send_analytics { analytics::init_analytics_scheduler()?; @@ -553,8 +554,12 @@ impl Server { // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); - localsync_handler.join().unwrap_or(()); - remote_sync_handler.join().unwrap_or(()); + if let Err(e) = localsync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } return e }, _ = &mut localsync_outbox => { @@ -564,8 +569,10 @@ impl Server { }, _ = &mut remote_sync_outbox => { // remote_sync failed, this is recoverable by just starting remote_sync thread again - remote_sync_handler.join().unwrap_or(()); - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync(); + if let Err(e) = remote_sync_handler.await { + log::error!("Error joining remote_sync_handler: {:?}", e); + } + (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; } }; } diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index 56c0efa0c..c65c94c88 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -19,7 +19,6 @@ use std::hash::Hash; use std::num::NonZeroU32; use std::sync::Mutex; -use std::thread; use std::time::Duration; use clokwerk::AsyncScheduler; @@ -27,28 +26,21 @@ use clokwerk::Job; use clokwerk::TimeUnits; use derive_more::Display; use once_cell::sync::Lazy; +use tokio::task::JoinHandle; use crate::metadata::STREAM_INFO; -type SchedulerHandle = thread::JoinHandle<()>; +type SchedulerHandle = JoinHandle<()>; static SCHEDULER_HANDLER: Lazy>> = Lazy::new(|| Mutex::new(None)); -fn async_runtime() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_current_thread() - .thread_name("retention-task-thread") - .enable_all() - .build() - .unwrap() -} - pub fn load_retention_from_global() { log::info!("loading retention for all streams"); init_scheduler(); } pub fn init_scheduler() { - log::info!("Setting up schedular"); + log::info!("Setting up scheduler"); let mut scheduler = AsyncScheduler::new(); let func = move || async { //get retention every day at 12 am @@ -57,22 +49,17 @@ pub fn init_scheduler() { match retention { Ok(config) => { - if config.is_none() { - continue; - } - for Task { action, days, .. } in config.unwrap().tasks.into_iter() { - match action { - Action::Delete => { - let stream = stream.to_string(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - // Run the asynchronous delete action + if let Some(config) = config { + for Task { action, days, .. } in config.tasks.into_iter() { + match action { + Action::Delete => { + let stream = stream.to_string(); + tokio::spawn(async move { action::delete(stream.clone(), u32::from(days)).await; }); - }); - } - }; + } + }; + } } } Err(err) => { @@ -83,21 +70,17 @@ pub fn init_scheduler() { }; // Execute once on startup - thread::spawn(move || { - let rt = async_runtime(); - rt.block_on(func()); + tokio::spawn(async move { + func().await; }); scheduler.every(1.day()).at("00:00").run(func); - let scheduler_handler = thread::spawn(|| { - let rt = async_runtime(); - rt.block_on(async move { - loop { - tokio::time::sleep(Duration::from_secs(10)).await; - scheduler.run_pending().await; - } - }); + let scheduler_handler = tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + scheduler.run_pending().await; + } }); *SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler); diff --git a/server/src/sync.rs b/server/src/sync.rs index b44dcde13..446f53b7e 100644 --- a/server/src/sync.rs +++ b/server/src/sync.rs @@ -16,97 +16,120 @@ * */ -use clokwerk::{AsyncScheduler, Job, Scheduler, TimeUnits}; -use thread_priority::{ThreadBuilder, ThreadPriority}; +use clokwerk::{AsyncScheduler, Job, TimeUnits}; +use std::panic::AssertUnwindSafe; use tokio::sync::oneshot; -use tokio::sync::oneshot::error::TryRecvError; - -use std::panic::{catch_unwind, AssertUnwindSafe}; -use std::thread::{self, JoinHandle}; -use std::time::Duration; +use tokio::task; +use tokio::time::{interval, sleep, Duration}; use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; -pub fn object_store_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { +pub async fn object_store_sync() -> ( + task::JoinHandle<()>, + oneshot::Receiver<()>, + oneshot::Sender<()>, +) { let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); - let mut inbox_rx = AssertUnwindSafe(inbox_rx); - let handle = thread::spawn(move || { - let res = catch_unwind(move || { - let rt = actix_web::rt::System::new(); - rt.block_on(async { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(STORAGE_UPLOAD_INTERVAL.seconds()) - // Extra time interval is added so that this schedular does not race with local sync. - .plus(5u32.seconds()) - .run(|| async { - if let Err(e) = CONFIG.storage().get_object_store().sync().await { - log::warn!("failed to sync local data with object store. {:?}", e); - } - }); - - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - scheduler.run_pending().await; - match AssertUnwindSafe(|| inbox_rx.try_recv())() { - Ok(_) => break, - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Closed) => { - // should be unreachable but breaking anyways - break; - } + + let handle = task::spawn(async move { + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut scheduler = AsyncScheduler::new(); + scheduler + .every(STORAGE_UPLOAD_INTERVAL.seconds()) + .plus(5u32.seconds()) + .run(|| async { + if let Err(e) = CONFIG.storage().get_object_store().sync().await { + log::warn!("failed to sync local data with object store. {:?}", e); + } + }); + + let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let mut check_interval = interval(Duration::from_secs(1)); + + loop { + check_interval.tick().await; + scheduler.run_pending().await; + + match inbox_rx.try_recv() { + Ok(_) => break, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + log::warn!("Inbox channel closed unexpectedly"); + break; } } - }) - }); + } + })); - if res.is_err() { - outbox_tx.send(()).unwrap(); + match result { + Ok(future) => { + future.await; + } + Err(panic_error) => { + log::error!("Panic in object store sync task: {:?}", panic_error); + let _ = outbox_tx.send(()); + } } + + log::info!("Object store sync task ended"); }); (handle, outbox_rx, inbox_tx) } -pub fn run_local_sync() -> (JoinHandle<()>, oneshot::Receiver<()>, oneshot::Sender<()>) { +pub async fn run_local_sync() -> ( + task::JoinHandle<()>, + oneshot::Receiver<()>, + oneshot::Sender<()>, +) { let (outbox_tx, outbox_rx) = oneshot::channel::<()>(); let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); - let mut inbox_rx = AssertUnwindSafe(inbox_rx); - - let handle = ThreadBuilder::default() - .name("local-sync") - .priority(ThreadPriority::Max) - .spawn(move |priority_result| { - if priority_result.is_err() { - log::warn!("Max priority cannot be set for sync thread. Make sure that user/program is allowed to set thread priority.") - } - let res = catch_unwind(move || { - let mut scheduler = Scheduler::new(); - scheduler - .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) - .run(move || crate::event::STREAM_WRITERS.unset_all()); - - loop { - thread::sleep(Duration::from_millis(50)); - scheduler.run_pending(); - match AssertUnwindSafe(|| inbox_rx.try_recv())() { - Ok(_) => break, - Err(TryRecvError::Empty) => continue, - Err(TryRecvError::Closed) => { - // should be unreachable but breaking anyways - break; - } + + let handle = task::spawn(async move { + log::info!("Local sync task started"); + let mut inbox_rx = inbox_rx; + + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let mut scheduler = AsyncScheduler::new(); + scheduler + .every((storage::LOCAL_SYNC_INTERVAL as u32).seconds()) + .run(|| async { + crate::event::STREAM_WRITERS.unset_all(); + }); + + loop { + // Sleep for 50ms + sleep(Duration::from_millis(50)).await; + + // Run any pending scheduled tasks + scheduler.run_pending().await; + + // Check inbox + match inbox_rx.try_recv() { + Ok(_) => break, + Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, + Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { + log::warn!("Inbox channel closed unexpectedly"); + break; } } - }); + } + })); - if res.is_err() { - outbox_tx.send(()).unwrap(); + match result { + Ok(future) => { + future.await; + } + Err(panic_error) => { + log::error!("Panic in local sync task: {:?}", panic_error); } - }) - .unwrap(); + } + + let _ = outbox_tx.send(()); + log::info!("Local sync task ended"); + }); (handle, outbox_rx, inbox_tx) }