From c9dad764e3bffe683a46925d8d5c33f7c50b8441 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith [Google LLC]" Date: Mon, 24 Jan 2022 07:48:22 +0000 Subject: [PATCH 1/4] Revert "bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)" This reverts commit fdc0e09c3316098b038996c428e88931f0a4fcdb. This implementation relies on a mechanism for spawning new children dynamically rather than up front that leads to deadlocks due to mixing of threads+fork. See bpo-46464. --- Doc/library/concurrent.futures.rst | 11 +---- Lib/concurrent/futures/process.py | 69 ++++++----------------------- Lib/test/test_concurrent_futures.py | 31 +------------ 3 files changed, 16 insertions(+), 95 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 959280833997e5..465d437fcbe576 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter. Calling :class:`Executor` or :class:`Future` methods from a callable submitted to a :class:`ProcessPoolExecutor` will result in deadlock. -.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None) +.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=()) An :class:`Executor` subclass that executes calls asynchronously using a pool of at most *max_workers* processes. If *max_workers* is ``None`` or not @@ -252,11 +252,6 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, as well as any attempt to submit more jobs to the pool. - *max_tasks_per_child* is an optional argument that specifies the maximum - number of tasks a single process can execute before it will exit and be - replaced with a fresh worker process. The default *max_tasks_per_child* is - ``None`` which means worker processes will live as long as the pool. - .. versionchanged:: 3.3 When one of the worker processes terminates abruptly, a :exc:`BrokenProcessPool` error is now raised. Previously, behaviour @@ -269,10 +264,6 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. Added the *initializer* and *initargs* arguments. - .. versionchanged:: 3.11 - The *max_tasks_per_child* argument was added to allow users to - control the lifetime of workers in the pool. - .. _processpoolexecutor-example: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 695f7733305ed7..6354c4683bba8e 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -141,11 +141,10 @@ def __init__(self, future, fn, args, kwargs): self.kwargs = kwargs class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None, exit_pid=None): + def __init__(self, work_id, exception=None, result=None): self.work_id = work_id self.exception = exception self.result = result - self.exit_pid = exit_pid class _CallItem(object): def __init__(self, work_id, fn, args, kwargs): @@ -202,19 +201,17 @@ def _process_chunk(fn, chunk): return [fn(*args) for args in chunk] -def _sendback_result(result_queue, work_id, result=None, exception=None, - exit_pid=None): +def _sendback_result(result_queue, work_id, result=None, exception=None): """Safely send back the given result or exception""" try: result_queue.put(_ResultItem(work_id, result=result, - exception=exception, exit_pid=exit_pid)) + exception=exception)) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(work_id, exception=exc, - exit_pid=exit_pid)) + result_queue.put(_ResultItem(work_id, exception=exc)) -def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): +def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -235,38 +232,25 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N # The parent will notice that the process stopped and # mark the pool broken return - num_tasks = 0 - exit_pid = None while True: call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread result_queue.put(os.getpid()) return - - if max_tasks is not None: - num_tasks += 1 - if num_tasks >= max_tasks: - exit_pid = os.getpid() - try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - _sendback_result(result_queue, call_item.work_id, exception=exc, - exit_pid=exit_pid) + _sendback_result(result_queue, call_item.work_id, exception=exc) else: - _sendback_result(result_queue, call_item.work_id, result=r, - exit_pid=exit_pid) + _sendback_result(result_queue, call_item.work_id, result=r) del r # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore del call_item - if exit_pid is not None: - return - class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. @@ -317,10 +301,6 @@ def weakref_cb(_, # A queue.Queue of work ids e.g. Queue([5, 6, ...]). self.work_ids_queue = executor._work_ids - # Maximum number of tasks a worker process can execute before - # exiting safely - self.max_tasks_per_child = executor._max_tasks_per_child - # A dict mapping work ids to _WorkItems e.g. # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} self.pending_work_items = executor._pending_work_items @@ -340,23 +320,15 @@ def run(self): return if result_item is not None: self.process_result_item(result_item) - - process_exited = result_item.exit_pid is not None - if process_exited: - p = self.processes.pop(result_item.exit_pid) - p.join() - # Delete reference to result_item to avoid keeping references # while waiting on new results. del result_item - if executor := self.executor_reference(): - if process_exited: - with self.shutdown_lock: - executor._adjust_process_count() - else: - executor._idle_worker_semaphore.release() - del executor + # attempt to increment idle process count + executor = self.executor_reference() + if executor is not None: + executor._idle_worker_semaphore.release() + del executor if self.is_shutting_down(): self.flag_executor_shutting_down() @@ -606,7 +578,7 @@ class BrokenProcessPool(_base.BrokenExecutor): class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, - initializer=None, initargs=(), *, max_tasks_per_child=None): + initializer=None, initargs=()): """Initializes a new ProcessPoolExecutor instance. Args: @@ -617,11 +589,6 @@ def __init__(self, max_workers=None, mp_context=None, object should provide SimpleQueue, Queue and Process. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. - max_tasks_per_child: The maximum number of tasks a worker process can - complete before it will exit and be replaced with a fresh - worker process, to enable unused resources to be freed. The - default value is None, which means worker process will live - as long as the executor will live. """ _check_system_limits() @@ -649,13 +616,6 @@ def __init__(self, max_workers=None, mp_context=None, self._initializer = initializer self._initargs = initargs - if max_tasks_per_child is not None: - if not isinstance(max_tasks_per_child, int): - raise TypeError("max_tasks_per_child must be an integer") - elif max_tasks_per_child <= 0: - raise ValueError("max_tasks_per_child must be >= 1") - self._max_tasks_per_child = max_tasks_per_child - # Management thread self._executor_manager_thread = None @@ -718,8 +678,7 @@ def _adjust_process_count(self): args=(self._call_queue, self._result_queue, self._initializer, - self._initargs, - self._max_tasks_per_child)) + self._initargs)) p.start() self._processes[p.pid] = p diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 71c88a3cadd255..29e041deeca57f 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -49,6 +49,7 @@ def create_future(state=PENDING, exception=None, result=None): INITIALIZER_STATUS = 'uninitialized' + def mul(x, y): return x * y @@ -1045,36 +1046,6 @@ def test_idle_process_reuse_multiple(self): self.assertLessEqual(len(executor._processes), 2) executor.shutdown() - def test_max_tasks_per_child(self): - executor = self.executor_type(1, max_tasks_per_child=3) - f1 = executor.submit(os.getpid) - original_pid = f1.result() - # The worker pid remains the same as the worker could be reused - f2 = executor.submit(os.getpid) - self.assertEqual(f2.result(), original_pid) - self.assertEqual(len(executor._processes), 1) - f3 = executor.submit(os.getpid) - self.assertEqual(f3.result(), original_pid) - - # A new worker is spawned, with a statistically different pid, - # while the previous was reaped. - f4 = executor.submit(os.getpid) - new_pid = f4.result() - self.assertNotEqual(original_pid, new_pid) - self.assertEqual(len(executor._processes), 1) - - executor.shutdown() - - def test_max_tasks_early_shutdown(self): - executor = self.executor_type(3, max_tasks_per_child=1) - futures = [] - for i in range(6): - futures.append(executor.submit(mul, i, i)) - executor.shutdown() - for i, future in enumerate(futures): - self.assertEqual(future.result(), mul(i, i)) - - create_executor_tests(ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, From c1a41e4579352396a3d9ba094b80e3838cd72168 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith [Google LLC]" Date: Mon, 24 Jan 2022 07:53:56 +0000 Subject: [PATCH 2/4] NEWS entry about the bpo-44733 rollback. --- .../next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst diff --git a/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst b/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst new file mode 100644 index 00000000000000..cc7d1e73da5933 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst @@ -0,0 +1,3 @@ +Remove the 3.11.0a3 Add ``max_tasks_per_child`` from +:class:`concurrent.futures.ProcessPoolExecutor` as the implementation relied +on mixing threads+fork which can cause deadlocks . From bbe32966c15fd240d434a9a268326e3e952f8674 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith [Google LLC]" Date: Mon, 24 Jan 2022 08:33:01 +0000 Subject: [PATCH 3/4] Rollback 1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe bpo-39207: Spawn workers on demand in ProcessPoolExecutor (GH-19453) was implemented in a way that introduced child process deadlocks in some environments due to mixing threading and fork() in the parent process. --- Doc/whatsnew/3.9.rst | 2 ++ Lib/concurrent/futures/process.py | 23 ++++++++----------- .../2022-01-24-07-53-50.bpo-46464.Duuez0.rst | 10 +++++--- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index 0d514084d6cc14..e467ac1b32542a 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -404,6 +404,8 @@ Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on demand, only when there are no available idle workers to reuse. This optimizes startup overhead and reduces the amount of lost CPU time to idle workers. (Contributed by Kyle Stanley in :issue:`39207`.) +Update: This was reverted in 3.9.11, 3.10.3, and 3.11 as the implementation +could lead to deadlocks. See :issue:`46464`. curses ------ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 6354c4683bba8e..c050321f70751c 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -324,12 +324,6 @@ def run(self): # while waiting on new results. del result_item - # attempt to increment idle process count - executor = self.executor_reference() - if executor is not None: - executor._idle_worker_semaphore.release() - del executor - if self.is_shutting_down(): self.flag_executor_shutting_down() @@ -625,7 +619,6 @@ def __init__(self, max_workers=None, mp_context=None, # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() - self._idle_worker_semaphore = threading.Semaphore(0) self._broken = False self._queue_count = 0 self._pending_work_items = {} @@ -661,18 +654,21 @@ def __init__(self, max_workers=None, mp_context=None, def _start_executor_manager_thread(self): if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. + self._adjust_process_count() self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ self._executor_manager_thread_wakeup def _adjust_process_count(self): - # if there's an idle process, we don't need to spawn a new one. - if self._idle_worker_semaphore.acquire(blocking=False): - return - - process_count = len(self._processes) - if process_count < self._max_workers: + # To get rid of this condition don't fork() from this process. + # This applies to _any_ thread existing in the process at all, but + # that is a long standing issue. We at least make sure this library + # is not the cause of its own deadlocks. + assert not self._executor_manager_thread, ( + 'Processes cannot be fork()ed after the thread has started, ' + 'deadlock in the child processes could result; bpo-46464.') + for _ in range(len(self._processes), self._max_workers): p = self._mp_context.Process( target=_process_worker, args=(self._call_queue, @@ -701,7 +697,6 @@ def submit(self, fn, /, *args, **kwargs): # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() - self._adjust_process_count() self._start_executor_manager_thread() return f submit.__doc__ = _base.Executor.submit.__doc__ diff --git a/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst b/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst index cc7d1e73da5933..9d50c8d4842efa 100644 --- a/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst +++ b/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst @@ -1,3 +1,7 @@ -Remove the 3.11.0a3 Add ``max_tasks_per_child`` from -:class:`concurrent.futures.ProcessPoolExecutor` as the implementation relied -on mixing threads+fork which can cause deadlocks . +Remove the 3.11.0a3 :issue:`44733` added ``max_tasks_per_child`` feature from +:class:`concurrent.futures.ProcessPoolExecutor` as the implementation relied on +mixing threading and fork() which can cause deadlocks in child processes. + +Remove the 3.9 :issue:`39207` performance enhancement from that spawned worker +processes for :class:`concurrent.futures.ProcessPoolExecutor` on demand rather +than up front as the implementation could cause deadlocks in the child process. From 55ad4e2365180abd5469e84c706bef3214b3681c Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith [Google LLC]" Date: Mon, 24 Jan 2022 08:48:34 +0000 Subject: [PATCH 4/4] Rollback the tests for 1ac6e379297cc1cf8acf6c1b011fccc7b3da2cbe as well. --- Lib/test/test_concurrent_futures.py | 44 +++-------------------------- 1 file changed, 4 insertions(+), 40 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 29e041deeca57f..7d408dd1f34caf 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -504,16 +504,10 @@ def _prime_executor(self): pass def test_processes_terminate(self): - def acquire_lock(lock): - lock.acquire() - - mp_context = get_context() - sem = mp_context.Semaphore(0) - for _ in range(3): - self.executor.submit(acquire_lock, sem) - self.assertEqual(len(self.executor._processes), 3) - for _ in range(3): - sem.release() + self.executor.submit(mul, 21, 2) + self.executor.submit(mul, 6, 7) + self.executor.submit(mul, 3, 14) + self.assertEqual(len(self.executor._processes), 5) processes = self.executor._processes self.executor.shutdown() @@ -1015,36 +1009,6 @@ def test_ressources_gced_in_workers(self): mgr.shutdown() mgr.join() - def test_saturation(self): - executor = self.executor_type(4) - mp_context = get_context() - sem = mp_context.Semaphore(0) - job_count = 15 * executor._max_workers - try: - for _ in range(job_count): - executor.submit(sem.acquire) - self.assertEqual(len(executor._processes), executor._max_workers) - for _ in range(job_count): - sem.release() - finally: - executor.shutdown() - - def test_idle_process_reuse_one(self): - executor = self.executor_type(4) - executor.submit(mul, 21, 2).result() - executor.submit(mul, 6, 7).result() - executor.submit(mul, 3, 14).result() - self.assertEqual(len(executor._processes), 1) - executor.shutdown() - - def test_idle_process_reuse_multiple(self): - executor = self.executor_type(4) - executor.submit(mul, 12, 7).result() - executor.submit(mul, 33, 25) - executor.submit(mul, 25, 26).result() - executor.submit(mul, 18, 29) - self.assertLessEqual(len(executor._processes), 2) - executor.shutdown() create_executor_tests(ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin,