diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 959280833997e5..465d437fcbe576 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter. Calling :class:`Executor` or :class:`Future` methods from a callable submitted to a :class:`ProcessPoolExecutor` will result in deadlock. -.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None) +.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=()) An :class:`Executor` subclass that executes calls asynchronously using a pool of at most *max_workers* processes. If *max_workers* is ``None`` or not @@ -252,11 +252,6 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, as well as any attempt to submit more jobs to the pool. - *max_tasks_per_child* is an optional argument that specifies the maximum - number of tasks a single process can execute before it will exit and be - replaced with a fresh worker process. The default *max_tasks_per_child* is - ``None`` which means worker processes will live as long as the pool. - .. versionchanged:: 3.3 When one of the worker processes terminates abruptly, a :exc:`BrokenProcessPool` error is now raised. Previously, behaviour @@ -269,10 +264,6 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. Added the *initializer* and *initargs* arguments. - .. versionchanged:: 3.11 - The *max_tasks_per_child* argument was added to allow users to - control the lifetime of workers in the pool. - .. _processpoolexecutor-example: diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index 0d514084d6cc14..e467ac1b32542a 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -404,6 +404,8 @@ Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on demand, only when there are no available idle workers to reuse. This optimizes startup overhead and reduces the amount of lost CPU time to idle workers. (Contributed by Kyle Stanley in :issue:`39207`.) +Update: This was reverted in 3.9.11, 3.10.3, and 3.11 as the implementation +could lead to deadlocks. See :issue:`46464`. curses ------ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 695f7733305ed7..c050321f70751c 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -141,11 +141,10 @@ def __init__(self, future, fn, args, kwargs): self.kwargs = kwargs class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None, exit_pid=None): + def __init__(self, work_id, exception=None, result=None): self.work_id = work_id self.exception = exception self.result = result - self.exit_pid = exit_pid class _CallItem(object): def __init__(self, work_id, fn, args, kwargs): @@ -202,19 +201,17 @@ def _process_chunk(fn, chunk): return [fn(*args) for args in chunk] -def _sendback_result(result_queue, work_id, result=None, exception=None, - exit_pid=None): +def _sendback_result(result_queue, work_id, result=None, exception=None): """Safely send back the given result or exception""" try: result_queue.put(_ResultItem(work_id, result=result, - exception=exception, exit_pid=exit_pid)) + exception=exception)) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(work_id, exception=exc, - exit_pid=exit_pid)) + result_queue.put(_ResultItem(work_id, exception=exc)) -def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): +def _process_worker(call_queue, result_queue, initializer, initargs): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -235,38 +232,25 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N # The parent will notice that the process stopped and # mark the pool broken return - num_tasks = 0 - exit_pid = None while True: call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread result_queue.put(os.getpid()) return - - if max_tasks is not None: - num_tasks += 1 - if num_tasks >= max_tasks: - exit_pid = os.getpid() - try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - _sendback_result(result_queue, call_item.work_id, exception=exc, - exit_pid=exit_pid) + _sendback_result(result_queue, call_item.work_id, exception=exc) else: - _sendback_result(result_queue, call_item.work_id, result=r, - exit_pid=exit_pid) + _sendback_result(result_queue, call_item.work_id, result=r) del r # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore del call_item - if exit_pid is not None: - return - class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. @@ -317,10 +301,6 @@ def weakref_cb(_, # A queue.Queue of work ids e.g. Queue([5, 6, ...]). self.work_ids_queue = executor._work_ids - # Maximum number of tasks a worker process can execute before - # exiting safely - self.max_tasks_per_child = executor._max_tasks_per_child - # A dict mapping work ids to _WorkItems e.g. # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} self.pending_work_items = executor._pending_work_items @@ -340,24 +320,10 @@ def run(self): return if result_item is not None: self.process_result_item(result_item) - - process_exited = result_item.exit_pid is not None - if process_exited: - p = self.processes.pop(result_item.exit_pid) - p.join() - # Delete reference to result_item to avoid keeping references # while waiting on new results. del result_item - if executor := self.executor_reference(): - if process_exited: - with self.shutdown_lock: - executor._adjust_process_count() - else: - executor._idle_worker_semaphore.release() - del executor - if self.is_shutting_down(): self.flag_executor_shutting_down() @@ -606,7 +572,7 @@ class BrokenProcessPool(_base.BrokenExecutor): class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, - initializer=None, initargs=(), *, max_tasks_per_child=None): + initializer=None, initargs=()): """Initializes a new ProcessPoolExecutor instance. Args: @@ -617,11 +583,6 @@ def __init__(self, max_workers=None, mp_context=None, object should provide SimpleQueue, Queue and Process. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. - max_tasks_per_child: The maximum number of tasks a worker process can - complete before it will exit and be replaced with a fresh - worker process, to enable unused resources to be freed. The - default value is None, which means worker process will live - as long as the executor will live. """ _check_system_limits() @@ -649,13 +610,6 @@ def __init__(self, max_workers=None, mp_context=None, self._initializer = initializer self._initargs = initargs - if max_tasks_per_child is not None: - if not isinstance(max_tasks_per_child, int): - raise TypeError("max_tasks_per_child must be an integer") - elif max_tasks_per_child <= 0: - raise ValueError("max_tasks_per_child must be >= 1") - self._max_tasks_per_child = max_tasks_per_child - # Management thread self._executor_manager_thread = None @@ -665,7 +619,6 @@ def __init__(self, max_workers=None, mp_context=None, # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() - self._idle_worker_semaphore = threading.Semaphore(0) self._broken = False self._queue_count = 0 self._pending_work_items = {} @@ -701,25 +654,27 @@ def __init__(self, max_workers=None, mp_context=None, def _start_executor_manager_thread(self): if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. + self._adjust_process_count() self._executor_manager_thread = _ExecutorManagerThread(self) self._executor_manager_thread.start() _threads_wakeups[self._executor_manager_thread] = \ self._executor_manager_thread_wakeup def _adjust_process_count(self): - # if there's an idle process, we don't need to spawn a new one. - if self._idle_worker_semaphore.acquire(blocking=False): - return - - process_count = len(self._processes) - if process_count < self._max_workers: + # To get rid of this condition don't fork() from this process. + # This applies to _any_ thread existing in the process at all, but + # that is a long standing issue. We at least make sure this library + # is not the cause of its own deadlocks. + assert not self._executor_manager_thread, ( + 'Processes cannot be fork()ed after the thread has started, ' + 'deadlock in the child processes could result; bpo-46464.') + for _ in range(len(self._processes), self._max_workers): p = self._mp_context.Process( target=_process_worker, args=(self._call_queue, self._result_queue, self._initializer, - self._initargs, - self._max_tasks_per_child)) + self._initargs)) p.start() self._processes[p.pid] = p @@ -742,7 +697,6 @@ def submit(self, fn, /, *args, **kwargs): # Wake up queue management thread self._executor_manager_thread_wakeup.wakeup() - self._adjust_process_count() self._start_executor_manager_thread() return f submit.__doc__ = _base.Executor.submit.__doc__ diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 71c88a3cadd255..7d408dd1f34caf 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -49,6 +49,7 @@ def create_future(state=PENDING, exception=None, result=None): INITIALIZER_STATUS = 'uninitialized' + def mul(x, y): return x * y @@ -503,16 +504,10 @@ def _prime_executor(self): pass def test_processes_terminate(self): - def acquire_lock(lock): - lock.acquire() - - mp_context = get_context() - sem = mp_context.Semaphore(0) - for _ in range(3): - self.executor.submit(acquire_lock, sem) - self.assertEqual(len(self.executor._processes), 3) - for _ in range(3): - sem.release() + self.executor.submit(mul, 21, 2) + self.executor.submit(mul, 6, 7) + self.executor.submit(mul, 3, 14) + self.assertEqual(len(self.executor._processes), 5) processes = self.executor._processes self.executor.shutdown() @@ -1014,66 +1009,6 @@ def test_ressources_gced_in_workers(self): mgr.shutdown() mgr.join() - def test_saturation(self): - executor = self.executor_type(4) - mp_context = get_context() - sem = mp_context.Semaphore(0) - job_count = 15 * executor._max_workers - try: - for _ in range(job_count): - executor.submit(sem.acquire) - self.assertEqual(len(executor._processes), executor._max_workers) - for _ in range(job_count): - sem.release() - finally: - executor.shutdown() - - def test_idle_process_reuse_one(self): - executor = self.executor_type(4) - executor.submit(mul, 21, 2).result() - executor.submit(mul, 6, 7).result() - executor.submit(mul, 3, 14).result() - self.assertEqual(len(executor._processes), 1) - executor.shutdown() - - def test_idle_process_reuse_multiple(self): - executor = self.executor_type(4) - executor.submit(mul, 12, 7).result() - executor.submit(mul, 33, 25) - executor.submit(mul, 25, 26).result() - executor.submit(mul, 18, 29) - self.assertLessEqual(len(executor._processes), 2) - executor.shutdown() - - def test_max_tasks_per_child(self): - executor = self.executor_type(1, max_tasks_per_child=3) - f1 = executor.submit(os.getpid) - original_pid = f1.result() - # The worker pid remains the same as the worker could be reused - f2 = executor.submit(os.getpid) - self.assertEqual(f2.result(), original_pid) - self.assertEqual(len(executor._processes), 1) - f3 = executor.submit(os.getpid) - self.assertEqual(f3.result(), original_pid) - - # A new worker is spawned, with a statistically different pid, - # while the previous was reaped. - f4 = executor.submit(os.getpid) - new_pid = f4.result() - self.assertNotEqual(original_pid, new_pid) - self.assertEqual(len(executor._processes), 1) - - executor.shutdown() - - def test_max_tasks_early_shutdown(self): - executor = self.executor_type(3, max_tasks_per_child=1) - futures = [] - for i in range(6): - futures.append(executor.submit(mul, i, i)) - executor.shutdown() - for i, future in enumerate(futures): - self.assertEqual(future.result(), mul(i, i)) - create_executor_tests(ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst b/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst new file mode 100644 index 00000000000000..9d50c8d4842efa --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-01-24-07-53-50.bpo-46464.Duuez0.rst @@ -0,0 +1,7 @@ +Remove the 3.11.0a3 :issue:`44733` added ``max_tasks_per_child`` feature from +:class:`concurrent.futures.ProcessPoolExecutor` as the implementation relied on +mixing threading and fork() which can cause deadlocks in child processes. + +Remove the 3.9 :issue:`39207` performance enhancement from that spawned worker +processes for :class:`concurrent.futures.ProcessPoolExecutor` on demand rather +than up front as the implementation could cause deadlocks in the child process.