From e7e895f2396ca8d34207dde804b232843801e759 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith [Google LLC]" Date: Fri, 15 Apr 2022 22:20:45 -0700 Subject: [PATCH 1/3] gh-90622: Do not spawn ProcessPool workers on demand when they spawn via fork. This avoids potential deadlocks in the child processes due to forking from a multithreaded process. --- Lib/concurrent/futures/process.py | 39 ++++++++++++++----- Lib/test/test_concurrent_futures.py | 9 ++++- ...2-04-15-22-07-36.gh-issue-90622.0C6l8h.rst | 4 ++ 3 files changed, 40 insertions(+), 12 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 695f7733305ed7..e33f7b7ee5fcdf 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -644,6 +644,10 @@ def __init__(self, max_workers=None, mp_context=None, mp_context = mp.get_context() self._mp_context = mp_context + # https://github.com/python/cpython/issues/90622 + self._safe_to_dynamically_spawn_children = ( + self._mp_context.get_start_method(allow_none=False) != "fork") + if initializer is not None and not callable(initializer): raise TypeError("initializer must be a callable") self._initializer = initializer @@ -701,6 +705,8 @@ def __init__(self, max_workers=None, mp_context=None, def _start_executor_manager_thread(self): if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. + if not self._safe_to_dynamically_spawn_children: # ie, using fork. + self.__launch_processes() self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ @@ -713,15 +719,27 @@ def _adjust_process_count(self): process_count = len(self._processes) if process_count < self._max_workers: - p = self._mp_context.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._initializer, - self._initargs, - self._max_tasks_per_child)) - p.start() - self._processes[p.pid] = p + assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' + self.__spawn_process() + + def __launch_processes(self): + # https://github.com/python/cpython/issues/90622 + assert not self._executor_manager_thread, ( + 'Processes cannot be fork()ed after the thread has started, ' + 'deadlock in the child processes could result.') + for _ in range(len(self._processes), self._max_workers): + self.__spawn_process() + + def __spawn_process(self): + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs, + self._max_tasks_per_child)) + p.start() + self._processes[p.pid] = p def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: @@ -742,7 +760,8 @@ def submit(self, fn, /, *args, **kwargs): # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() - self._adjust_process_count() + if self._safe_to_dynamically_spawn_children: + self._adjust_process_count() self._start_executor_manager_thread() return f submit.__doc__ = _base.Executor.submit.__doc__ diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 8adba36a387ad0..5961d712775ba5 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -512,11 +512,16 @@ def test_processes_terminate(self): def acquire_lock(lock): lock.acquire() - mp_context = get_context() + mp_context = self.get_context() + if mp_context.get_start_method(allow_none=False) == "fork": + # fork pre-spawns, not on demand. + expected_num_processes = self.worker_count + else: + expected_num_processes = 3 sem = mp_context.Semaphore(0) for _ in range(3): self.executor.submit(acquire_lock, sem) - self.assertEqual(len(self.executor._processes), 3) + self.assertEqual(len(self.executor._processes), expected_num_processes) for _ in range(3): sem.release() processes = self.executor._processes diff --git a/Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst b/Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst new file mode 100644 index 00000000000000..5db0a1bbe721df --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-04-15-22-07-36.gh-issue-90622.0C6l8h.rst @@ -0,0 +1,4 @@ +Worker processes for :class:`concurrent.futures.ProcessPoolExecutor` are no +longer spawned on demand (a feature added in 3.9) when the multiprocessing +context start method is ``"fork"`` as that can lead to deadlocks in the +child processes due to a fork happening while threads are running. From 1f63d596deeee950b25c1531d97f542a1a228c3b Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith [Google LLC]" Date: Fri, 6 May 2022 06:50:13 +0000 Subject: [PATCH 2/3] Use single leading _s, disable assertion. Adds a comment describing why the lack of assertion means we still have a potential bug. --- Lib/concurrent/futures/process.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index e33f7b7ee5fcdf..ea1957493047cc 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -706,7 +706,7 @@ def _start_executor_manager_thread(self): if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. if not self._safe_to_dynamically_spawn_children: # ie, using fork. - self.__launch_processes() + self._launch_processes() self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ @@ -719,18 +719,23 @@ def _adjust_process_count(self): process_count = len(self._processes) if process_count < self._max_workers: - assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' - self.__spawn_process() - - def __launch_processes(self): + # Assertion disabled as this codepath is also used to replace a + # worker that unexpectedly dies, even when using the 'fork' start + # method. That means there is still a potential deadlock bug. If a + # 'fork' mp_context worker dies, we'll be forking a new one when + # we know a thread is running (self._executor_manager_thread). + #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' + self._spawn_process() + + def _launch_processes(self): # https://github.com/python/cpython/issues/90622 assert not self._executor_manager_thread, ( 'Processes cannot be fork()ed after the thread has started, ' 'deadlock in the child processes could result.') for _ in range(len(self._processes), self._max_workers): - self.__spawn_process() + self._spawn_process() - def __spawn_process(self): + def _spawn_process(self): p = self._mp_context.Process( target=_process_worker, args=(self._call_queue, From 3f1789a11bcb574bb703f8252f8053d0e05892c5 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith [Google LLC]" Date: Fri, 6 May 2022 07:54:13 +0000 Subject: [PATCH 3/3] Skip test_idle_process tests on "fork" --- Lib/test/test_concurrent_futures.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index a46facb1f8ee46..6f3b4609232bbb 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1027,6 +1027,8 @@ def test_saturation(self): def test_idle_process_reuse_one(self): executor = self.executor assert executor._max_workers >= 4 + if self.get_context().get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") executor.submit(mul, 21, 2).result() executor.submit(mul, 6, 7).result() executor.submit(mul, 3, 14).result() @@ -1035,6 +1037,8 @@ def test_idle_process_reuse_one(self): def test_idle_process_reuse_multiple(self): executor = self.executor assert executor._max_workers <= 5 + if self.get_context().get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") executor.submit(mul, 12, 7).result() executor.submit(mul, 33, 25) executor.submit(mul, 25, 26).result()