Skip to content

Commit 29eb97c

Browse files
committed
fix: cleanup worker threads channel when project shutdown
1 parent 5124441 commit 29eb97c

File tree

6 files changed

+69
-35
lines changed

6 files changed

+69
-35
lines changed

crates/napi/src/next_api/project.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,7 @@ pub async fn project_shutdown(
722722
) {
723723
project.turbopack_ctx.turbo_tasks().stop_and_wait().await;
724724
project_on_exit_internal(&project).await;
725+
turbopack_node::worker_pool::shutdown();
725726
}
726727

727728
#[napi(object)]

packages/next/src/build/swc/index.ts

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -671,38 +671,54 @@ function bindingToApi(
671671
const loaderWorkers: Record<string, Array<Worker>> = {}
672672

673673
const createOrScalePool = async () => {
674-
let poolOptions = await binding.recvPoolRequest()
675-
const { filename, maxConcurrency } = poolOptions
676-
const workers = loaderWorkers[filename] || (loaderWorkers[filename] = [])
677-
if (workers.length < maxConcurrency) {
678-
for (let i = workers.length; i < maxConcurrency; i++) {
679-
const worker = new Worker(filename, {
680-
workerData: {
681-
poolId: filename,
682-
bindingPath,
683-
},
684-
})
685-
worker.unref()
686-
workers.push(worker)
674+
while (true) {
675+
try {
676+
let poolOptions = await binding.recvPoolRequest()
677+
const { filename, maxConcurrency } = poolOptions
678+
const workers =
679+
loaderWorkers[filename] || (loaderWorkers[filename] = [])
680+
if (workers.length < maxConcurrency) {
681+
for (let i = workers.length; i < maxConcurrency; i++) {
682+
const worker = new Worker(filename, {
683+
workerData: {
684+
poolId: filename,
685+
bindingPath,
686+
},
687+
})
688+
worker.unref()
689+
workers.push(worker)
690+
}
691+
} else if (workers.length > maxConcurrency) {
692+
const workersToStop = workers.splice(
693+
0,
694+
workers.length - maxConcurrency
695+
)
696+
workersToStop.forEach((worker) => worker.terminate())
697+
}
698+
} catch (_) {
699+
// rust channel closed, do nothing
700+
return
687701
}
688-
} else if (workers.length > maxConcurrency) {
689-
const workersToStop = workers.splice(0, workers.length - maxConcurrency)
690-
workersToStop.forEach((worker) => worker.terminate())
691702
}
692-
createOrScalePool()
693703
}
694704

695705
const waitingForWorkerTermination = async () => {
696-
const { filename, workerId } = await binding.recvWorkerTermination()
697-
const workers = loaderWorkers[filename]
698-
const workerIdx = workers.findIndex(
699-
(worker) => worker.threadId === workerId
700-
)
701-
if (workerIdx > -1) {
702-
const worker = workers.splice(workerIdx, 1)
703-
worker[0].terminate()
706+
while (true) {
707+
try {
708+
const { filename, workerId } = await binding.recvWorkerTermination()
709+
const workers = loaderWorkers[filename]
710+
const workerIdx = workers.findIndex(
711+
(worker) => worker.threadId === workerId
712+
)
713+
if (workerIdx > -1) {
714+
const worker = workers.splice(workerIdx, 1)
715+
worker[0].terminate()
716+
}
717+
} catch (_) {
718+
// rust channel closed, do nothing
719+
return
720+
}
704721
}
705-
waitingForWorkerTermination()
706722
}
707723

708724
class ProjectImpl implements Project {

turbopack/crates/turbopack-node/js/src/worker_threads/evaluate.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ export const run = async (
121121
}
122122

123123
while (true) {
124+
// need await a micro task, or else if some request rejected,
125+
// the error will be propergated to schedule thread, then causing panic
126+
await Promise.resolve()
124127
const msg_str = await binding.recvMessageInWorker(workerId)
125128

126129
const msg = JSON.parse(msg_str) as
@@ -150,9 +153,6 @@ export const run = async (
150153
requests.delete(msg.id)
151154
if (msg.error) {
152155
request.reject(new Error(msg.error))
153-
// need await a micro task, or else if some request rejected,
154-
// the error will be propergated to schedule thread, then causing panic
155-
await Promise.resolve()
156156
} else {
157157
request.resolve(msg.data)
158158
}

turbopack/crates/turbopack-node/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ mod process_pool;
2323
pub mod source_map;
2424
pub mod transforms;
2525
#[cfg(feature = "worker_pool")]
26-
mod worker_pool;
26+
pub mod worker_pool;
2727

2828
#[turbo_tasks::function]
2929
async fn emit(

turbopack/crates/turbopack-node/src/worker_pool/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,24 @@ use crate::{
1616
};
1717

1818
mod operation;
19+
pub use operation::shutdown;
1920
mod worker_thread;
2021

2122
static OPERATION_TASK_ID: AtomicU32 = AtomicU32::new(1);
2223

2324
#[turbo_tasks::value]
24-
pub struct WorkerThreadPool {
25+
pub(crate) struct WorkerThreadPool {
2526
cwd: PathBuf,
2627
entrypoint: PathBuf,
2728
env: FxHashMap<RcStr, RcStr>,
2829
concurrency: usize,
29-
pub assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
30-
pub assets_root: FileSystemPath,
31-
pub project_dir: FileSystemPath,
30+
pub(crate) assets_for_source_mapping: ResolvedVc<AssetsForSourceMapping>,
31+
pub(crate) assets_root: FileSystemPath,
32+
pub(crate) project_dir: FileSystemPath,
3233
}
3334

3435
impl WorkerThreadPool {
35-
pub fn create(
36+
pub(crate) fn create(
3637
cwd: PathBuf,
3738
entrypoint: PathBuf,
3839
env: FxHashMap<RcStr, RcStr>,

turbopack/crates/turbopack-node/src/worker_pool/operation.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ impl<T: Send + Sync + 'static> MessageChannel<T> {
2727
pub(crate) async fn recv(&self) -> Result<T> {
2828
Ok(self.receiver.recv().await?)
2929
}
30+
31+
pub(crate) fn close(&self) {
32+
self.sender.close();
33+
self.receiver.close();
34+
}
3035
}
3136

3237
pub(crate) struct WorkerPoolOperation {
@@ -138,6 +143,13 @@ impl WorkerPoolOperation {
138143
.context("failed to recv pool request")
139144
}
140145

146+
pub(crate) fn shutdown(&self) {
147+
// We need to close channels connected to schedule thread,
148+
// or else, it will be forever waiting in schedule thread
149+
self.pool_request_channel.close();
150+
self.worker_termination_channel.close();
151+
}
152+
141153
pub(crate) async fn recv_worker_request(&self, pool_id: String) -> Result<u32> {
142154
let channel = self
143155
.worker_request_channel
@@ -217,6 +229,10 @@ pub async fn recv_task_message(task_id: u32) -> Result<String> {
217229
WORKER_POOL_OPERATION.recv_task_response(task_id).await
218230
}
219231

232+
pub fn shutdown() {
233+
WORKER_POOL_OPERATION.shutdown();
234+
}
235+
220236
pub(crate) struct WorkerOperation {
221237
pub(crate) pool_id: String,
222238
pub(crate) task_id: u32,

0 commit comments

Comments
 (0)