Skip to content

Commit 78e11e4

Browse files
miss-islingtongpshead
authored andcommitted
[3.10] pythongh-90622: Do not spawn ProcessPool workers on demand via fork method. (pythonGH-91598) (pythonGH-92497) (python#92499)
Do not spawn ProcessPool workers on demand when they spawn via fork. This avoids potential deadlocks in the child processes due to forking from a multithreaded process.. (cherry picked from commit ebb37fc) Co-authored-by: Gregory P. Smith <[email protected]> (cherry picked from commit b795376) Co-authored-by: Gregory P. Smith <[email protected]> Co-authored-by: Gregory P. Smith <[email protected]>
1 parent 7ca126d commit 78e11e4

File tree

3 files changed

+48
-10
lines changed

3 files changed

+48
-10
lines changed

Lib/concurrent/futures/process.py

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,10 @@ def __init__(self, max_workers=None, mp_context=None,
607607
mp_context = mp.get_context()
608608
self._mp_context = mp_context
609609

610+
# https://github.com/python/cpython/issues/90622
611+
self._safe_to_dynamically_spawn_children = (
612+
self._mp_context.get_start_method(allow_none=False) != "fork")
613+
610614
if initializer is not None and not callable(initializer):
611615
raise TypeError("initializer must be a callable")
612616
self._initializer = initializer
@@ -657,6 +661,8 @@ def __init__(self, max_workers=None, mp_context=None,
657661
def _start_executor_manager_thread(self):
658662
if self._executor_manager_thread is None:
659663
# Start the processes so that their sentinels are known.
664+
if not self._safe_to_dynamically_spawn_children: # ie, using fork.
665+
self._launch_processes()
660666
self._executor_manager_thread = _ExecutorManagerThread(self)
661667
self._executor_manager_thread.start()
662668
_threads_wakeups[self._executor_manager_thread] = \
@@ -669,14 +675,31 @@ def _adjust_process_count(self):
669675

670676
process_count = len(self._processes)
671677
if process_count < self._max_workers:
672-
p = self._mp_context.Process(
673-
target=_process_worker,
674-
args=(self._call_queue,
675-
self._result_queue,
676-
self._initializer,
677-
self._initargs))
678-
p.start()
679-
self._processes[p.pid] = p
678+
# Assertion disabled as this codepath is also used to replace a
679+
# worker that unexpectedly dies, even when using the 'fork' start
680+
# method. That means there is still a potential deadlock bug. If a
681+
# 'fork' mp_context worker dies, we'll be forking a new one when
682+
# we know a thread is running (self._executor_manager_thread).
683+
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
684+
self._spawn_process()
685+
686+
def _launch_processes(self):
687+
# https://github.com/python/cpython/issues/90622
688+
assert not self._executor_manager_thread, (
689+
'Processes cannot be fork()ed after the thread has started, '
690+
'deadlock in the child processes could result.')
691+
for _ in range(len(self._processes), self._max_workers):
692+
self._spawn_process()
693+
694+
def _spawn_process(self):
695+
p = self._mp_context.Process(
696+
target=_process_worker,
697+
args=(self._call_queue,
698+
self._result_queue,
699+
self._initializer,
700+
self._initargs))
701+
p.start()
702+
self._processes[p.pid] = p
680703

681704
def submit(self, fn, /, *args, **kwargs):
682705
with self._shutdown_lock:
@@ -697,7 +720,8 @@ def submit(self, fn, /, *args, **kwargs):
697720
# Wake up queue management thread
698721
self._executor_manager_thread_wakeup.wakeup()
699722

700-
self._adjust_process_count()
723+
if self._safe_to_dynamically_spawn_children:
724+
self._adjust_process_count()
701725
self._start_executor_manager_thread()
702726
return f
703727
submit.__doc__ = _base.Executor.submit.__doc__

Lib/test/test_concurrent_futures.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,16 @@ def acquire_lock(lock):
483483
lock.acquire()
484484

485485
mp_context = self.get_context()
486+
if mp_context.get_start_method(allow_none=False) == "fork":
487+
# fork pre-spawns, not on demand.
488+
expected_num_processes = self.worker_count
489+
else:
490+
expected_num_processes = 3
491+
486492
sem = mp_context.Semaphore(0)
487493
for _ in range(3):
488494
self.executor.submit(acquire_lock, sem)
489-
self.assertEqual(len(self.executor._processes), 3)
495+
self.assertEqual(len(self.executor._processes), expected_num_processes)
490496
for _ in range(3):
491497
sem.release()
492498
processes = self.executor._processes
@@ -1007,6 +1013,8 @@ def test_saturation(self):
10071013
def test_idle_process_reuse_one(self):
10081014
executor = self.executor
10091015
assert executor._max_workers >= 4
1016+
if self.get_context().get_start_method(allow_none=False) == "fork":
1017+
raise unittest.SkipTest("Incompatible with the fork start method.")
10101018
executor.submit(mul, 21, 2).result()
10111019
executor.submit(mul, 6, 7).result()
10121020
executor.submit(mul, 3, 14).result()
@@ -1015,6 +1023,8 @@ def test_idle_process_reuse_one(self):
10151023
def test_idle_process_reuse_multiple(self):
10161024
executor = self.executor
10171025
assert executor._max_workers <= 5
1026+
if self.get_context().get_start_method(allow_none=False) == "fork":
1027+
raise unittest.SkipTest("Incompatible with the fork start method.")
10181028
executor.submit(mul, 12, 7).result()
10191029
executor.submit(mul, 33, 25)
10201030
executor.submit(mul, 25, 26).result()
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Worker processes for :class:`concurrent.futures.ProcessPoolExecutor` are no
2+
longer spawned on demand (a feature added in 3.9) when the multiprocessing
3+
context start method is ``"fork"`` as that can lead to deadlocks in the
4+
child processes due to a fork happening while threads are running.

0 commit comments

Comments
 (0)