Skip to content

Commit fef8f6d

Browse files
fix: switched to tokio tasks (#862)
This PR updates the architecture to use tokio tasks instead of threads. This will allow better resource utilisation and performance. Refer: https://docs.rs/tokio/latest/tokio/task/index.html Rewrote object_store_sync() & run_local_sync() in server's sync.rs to use tokio tasks but with same function signature. Updated the invoking code to match. Rewrote init_scheduler() in server's storage/retention.rs to use tokio tasks but with same function signature. Updated the invoking code to match. --------- Signed-off-by: Vishal <[email protected]> Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 5f524b5 commit fef8f6d

File tree

5 files changed

+152
-126
lines changed

5 files changed

+152
-126
lines changed

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -339,9 +339,10 @@ impl IngestServer {
339339

340340
migration::run_migration(&CONFIG).await?;
341341

342-
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
342+
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
343+
sync::run_local_sync().await;
343344
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
344-
sync::object_store_sync();
345+
sync::object_store_sync().await;
345346

346347
tokio::spawn(airplane::server());
347348

@@ -354,8 +355,12 @@ impl IngestServer {
354355
// actix server finished .. stop other threads and stop the server
355356
remote_sync_inbox.send(()).unwrap_or(());
356357
localsync_inbox.send(()).unwrap_or(());
357-
localsync_handler.join().unwrap_or(());
358-
remote_sync_handler.join().unwrap_or(());
358+
if let Err(e) = localsync_handler.await {
359+
log::error!("Error joining remote_sync_handler: {:?}", e);
360+
}
361+
if let Err(e) = remote_sync_handler.await {
362+
log::error!("Error joining remote_sync_handler: {:?}", e);
363+
}
359364
return e
360365
},
361366
_ = &mut localsync_outbox => {
@@ -365,8 +370,10 @@ impl IngestServer {
365370
},
366371
_ = &mut remote_sync_outbox => {
367372
// remote_sync failed, this is recoverable by just starting remote_sync thread again
368-
remote_sync_handler.join().unwrap_or(());
369-
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync();
373+
if let Err(e) = remote_sync_handler.await {
374+
log::error!("Error joining remote_sync_handler: {:?}", e);
375+
}
376+
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
370377
}
371378

372379
};

server/src/handlers/http/modal/query_server.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ impl ParseableServer for QueryServer {
105105
fn validate(&self) -> anyhow::Result<()> {
106106
if CONFIG.get_storage_mode_string() == "Local drive" {
107107
return Err(anyhow::anyhow!(
108-
"Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.",
109-
));
108+
"Query Server cannot be started in local storage mode. Please start the server in a supported storage mode.",
109+
));
110110
}
111111

112112
Ok(())
@@ -188,13 +188,13 @@ impl QueryServer {
188188
if matches!(init_cluster_metrics_schedular(), Ok(())) {
189189
log::info!("Cluster metrics scheduler started successfully");
190190
}
191-
192191
if let Some(hot_tier_manager) = HotTierManager::global() {
193192
hot_tier_manager.download_from_s3()?;
194193
};
195-
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
194+
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
195+
sync::run_local_sync().await;
196196
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
197-
sync::object_store_sync();
197+
sync::object_store_sync().await;
198198

199199
tokio::spawn(airplane::server());
200200
let app = self.start(prometheus, CONFIG.parseable.openid.clone());
@@ -206,8 +206,12 @@ impl QueryServer {
206206
// actix server finished .. stop other threads and stop the server
207207
remote_sync_inbox.send(()).unwrap_or(());
208208
localsync_inbox.send(()).unwrap_or(());
209-
localsync_handler.join().unwrap_or(());
210-
remote_sync_handler.join().unwrap_or(());
209+
if let Err(e) = localsync_handler.await {
210+
log::error!("Error joining localsync_handler: {:?}", e);
211+
}
212+
if let Err(e) = remote_sync_handler.await {
213+
log::error!("Error joining remote_sync_handler: {:?}", e);
214+
}
211215
return e
212216
},
213217
_ = &mut localsync_outbox => {
@@ -217,8 +221,10 @@ impl QueryServer {
217221
},
218222
_ = &mut remote_sync_outbox => {
219223
// remote_sync failed, this is recoverable by just starting remote_sync thread again
220-
remote_sync_handler.join().unwrap_or(());
221-
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync();
224+
if let Err(e) = remote_sync_handler.await {
225+
log::error!("Error joining remote_sync_handler: {:?}", e);
226+
}
227+
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
222228
}
223229

224230
};

server/src/handlers/http/modal/server.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -533,9 +533,10 @@ impl Server {
533533

534534
storage::retention::load_retention_from_global();
535535

536-
let (localsync_handler, mut localsync_outbox, localsync_inbox) = sync::run_local_sync();
536+
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
537+
sync::run_local_sync().await;
537538
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
538-
sync::object_store_sync();
539+
sync::object_store_sync().await;
539540

540541
if CONFIG.parseable.send_analytics {
541542
analytics::init_analytics_scheduler()?;
@@ -553,8 +554,12 @@ impl Server {
553554
// actix server finished .. stop other threads and stop the server
554555
remote_sync_inbox.send(()).unwrap_or(());
555556
localsync_inbox.send(()).unwrap_or(());
556-
localsync_handler.join().unwrap_or(());
557-
remote_sync_handler.join().unwrap_or(());
557+
if let Err(e) = localsync_handler.await {
558+
log::error!("Error joining remote_sync_handler: {:?}", e);
559+
}
560+
if let Err(e) = remote_sync_handler.await {
561+
log::error!("Error joining remote_sync_handler: {:?}", e);
562+
}
558563
return e
559564
},
560565
_ = &mut localsync_outbox => {
@@ -564,8 +569,10 @@ impl Server {
564569
},
565570
_ = &mut remote_sync_outbox => {
566571
// remote_sync failed, this is recoverable by just starting remote_sync thread again
567-
remote_sync_handler.join().unwrap_or(());
568-
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync();
572+
if let Err(e) = remote_sync_handler.await {
573+
log::error!("Error joining remote_sync_handler: {:?}", e);
574+
}
575+
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
569576
}
570577
};
571578
}

server/src/storage/retention.rs

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,28 @@
1919
use std::hash::Hash;
2020
use std::num::NonZeroU32;
2121
use std::sync::Mutex;
22-
use std::thread;
2322
use std::time::Duration;
2423

2524
use clokwerk::AsyncScheduler;
2625
use clokwerk::Job;
2726
use clokwerk::TimeUnits;
2827
use derive_more::Display;
2928
use once_cell::sync::Lazy;
29+
use tokio::task::JoinHandle;
3030

3131
use crate::metadata::STREAM_INFO;
3232

33-
type SchedulerHandle = thread::JoinHandle<()>;
33+
type SchedulerHandle = JoinHandle<()>;
3434

3535
static SCHEDULER_HANDLER: Lazy<Mutex<Option<SchedulerHandle>>> = Lazy::new(|| Mutex::new(None));
3636

37-
fn async_runtime() -> tokio::runtime::Runtime {
38-
tokio::runtime::Builder::new_current_thread()
39-
.thread_name("retention-task-thread")
40-
.enable_all()
41-
.build()
42-
.unwrap()
43-
}
44-
4537
pub fn load_retention_from_global() {
4638
log::info!("loading retention for all streams");
4739
init_scheduler();
4840
}
4941

5042
pub fn init_scheduler() {
51-
log::info!("Setting up schedular");
43+
log::info!("Setting up scheduler");
5244
let mut scheduler = AsyncScheduler::new();
5345
let func = move || async {
5446
//get retention every day at 12 am
@@ -57,22 +49,17 @@ pub fn init_scheduler() {
5749

5850
match retention {
5951
Ok(config) => {
60-
if config.is_none() {
61-
continue;
62-
}
63-
for Task { action, days, .. } in config.unwrap().tasks.into_iter() {
64-
match action {
65-
Action::Delete => {
66-
let stream = stream.to_string();
67-
thread::spawn(move || {
68-
let rt = tokio::runtime::Runtime::new().unwrap();
69-
rt.block_on(async {
70-
// Run the asynchronous delete action
52+
if let Some(config) = config {
53+
for Task { action, days, .. } in config.tasks.into_iter() {
54+
match action {
55+
Action::Delete => {
56+
let stream = stream.to_string();
57+
tokio::spawn(async move {
7158
action::delete(stream.clone(), u32::from(days)).await;
7259
});
73-
});
74-
}
75-
};
60+
}
61+
};
62+
}
7663
}
7764
}
7865
Err(err) => {
@@ -83,21 +70,17 @@ pub fn init_scheduler() {
8370
};
8471

8572
// Execute once on startup
86-
thread::spawn(move || {
87-
let rt = async_runtime();
88-
rt.block_on(func());
73+
tokio::spawn(async move {
74+
func().await;
8975
});
9076

9177
scheduler.every(1.day()).at("00:00").run(func);
9278

93-
let scheduler_handler = thread::spawn(|| {
94-
let rt = async_runtime();
95-
rt.block_on(async move {
96-
loop {
97-
tokio::time::sleep(Duration::from_secs(10)).await;
98-
scheduler.run_pending().await;
99-
}
100-
});
79+
let scheduler_handler = tokio::spawn(async move {
80+
loop {
81+
tokio::time::sleep(Duration::from_secs(10)).await;
82+
scheduler.run_pending().await;
83+
}
10184
});
10285

10386
*SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler);

0 commit comments

Comments
 (0)