From 137c3f3da20383f1512c476ff6ebf68ff660295a Mon Sep 17 00:00:00 2001 From: Logan Asher Jones Date: Mon, 26 Jul 2021 13:38:41 -0400 Subject: [PATCH 1/6] bpo-44733: Add maxtasksperchild to ProcessPoolExecutor --- Doc/library/concurrent.futures.rst | 7 ++- Lib/concurrent/futures/process.py | 61 ++++++++++++++++--- Lib/test/test_concurrent_futures.py | 8 +++ .../2021-07-26-13-33-37.bpo-44733.88LrP1.rst | 4 ++ 4 files changed, 72 insertions(+), 8 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 897efc2f544426..ab58364260a241 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter. Calling :class:`Executor` or :class:`Future` methods from a callable submitted to a :class:`ProcessPoolExecutor` will result in deadlock. -.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=()) +.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), maxtasksperchild=None) An :class:`Executor` subclass that executes calls asynchronously using a pool of at most *max_workers* processes. If *max_workers* is ``None`` or not @@ -252,6 +252,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, as well as any attempt to submit more jobs to the pool. + *maxtasksperchild* is an optional argument that specifies the maximum + number of tasks a single process can execute before it will exit and be + replaced with a fresh worker process. The default *maxtasksperchild* is + ``None`` which means worker processes will live as long as the pool. + .. versionchanged:: 3.3 When one of the worker processes terminates abruptly, a :exc:`BrokenProcessPool` error is now raised. Previously, behaviour diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9904db78c5b4ce..95345060ac4f8e 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -211,7 +211,7 @@ def _sendback_result(result_queue, work_id, result=None, exception=None): result_queue.put(_ResultItem(work_id, exception=exc)) -def _process_worker(call_queue, result_queue, initializer, initargs): +def _process_worker(call_queue, result_queue, initializer, initargs, maxtasksperchild=None): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -232,7 +232,8 @@ def _process_worker(call_queue, result_queue, initializer, initargs): # The parent will notice that the process stopped and # mark the pool broken return - while True: + completed_count = 0 + while maxtasksperchild is None or (maxtasksperchild > completed_count): call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread @@ -246,11 +247,17 @@ def _process_worker(call_queue, result_queue, initializer, initargs): else: _sendback_result(result_queue, call_item.work_id, result=r) del r + finally: + completed_count += 1 # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore del call_item + # Reached the maximum number of tasks this process can work on + # notify the manager that this process is exiting cleanly + result_queue.put(os.getpid()) + class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. @@ -301,6 +308,19 @@ def weakref_cb(_, # A queue.Queue of work ids e.g. Queue([5, 6, ...]). self.work_ids_queue = executor._work_ids + # A multiprocessing context to use when respawning processes + self.mp_context = executor._mp_context + + # Initializer for respawned processes + self.initializer = executor._initializer + + # Initializer args for respawned processes + self.initargs = executor._initargs + + # Maximum number of tasks a worker process can execute before + # exiting safely + self.maxtasksperchild = executor._maxtasksperchild + # A dict mapping work ids to _WorkItems e.g. # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} self.pending_work_items = executor._pending_work_items @@ -400,11 +420,14 @@ def process_result_item(self, result_item): if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) - assert self.is_shutting_down() p = self.processes.pop(result_item) p.join() - if not self.processes: - self.join_executor_internals() + if self.is_shutting_down(): + if not self.processes: + self.join_executor_internals() + return + else: + self._spawn_replacement_process() return else: # Received a _ResultItem so mark the future as completed. @@ -520,6 +543,17 @@ def get_n_children_alive(self): # This is an upper bound on the number of children alive. return sum(p.is_alive() for p in self.processes.values()) + def _spawn_replacement_process(self): + p = self.mp_context.Process( + target=_process_worker, + args=(self.call_queue, + self.result_queue, + self.initializer, + self.initargs, + self.maxtasksperchild)) + p.start() + self.processes[p.pid] = p + _system_limits_checked = False _system_limited = None @@ -578,7 +612,7 @@ class BrokenProcessPool(_base.BrokenExecutor): class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, - initializer=None, initargs=()): + initializer=None, initargs=(), maxtasksperchild=None): """Initializes a new ProcessPoolExecutor instance. Args: @@ -589,6 +623,11 @@ def __init__(self, max_workers=None, mp_context=None, object should provide SimpleQueue, Queue and Process. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. + maxtasksperchild: The maximum number of tasks a worker process can + complete before it will exit and be replaced with a fresh + worker process, to enable unused resources to be freed. The + default value is None, which means worker process will live + as long as the executor will live. """ _check_system_limits() @@ -616,6 +655,13 @@ def __init__(self, max_workers=None, mp_context=None, self._initializer = initializer self._initargs = initargs + if maxtasksperchild is not None: + if not isinstance(maxtasksperchild, int): + raise TypeError("maxtasksperchild must be an integer") + elif maxtasksperchild < 0: + raise ValueError("maxtasksperchild must be >= 1") + self._maxtasksperchild = maxtasksperchild + # Management thread self._executor_manager_thread = None @@ -678,7 +724,8 @@ def _adjust_process_count(self): args=(self._call_queue, self._result_queue, self._initializer, - self._initargs)) + self._initargs, + self._maxtasksperchild)) p.start() self._processes[p.pid] = p diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 84209ca2520b89..ef5ee1f09a9abb 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1038,6 +1038,14 @@ def test_idle_process_reuse_multiple(self): self.assertLessEqual(len(executor._processes), 2) executor.shutdown() + def test_maxtasksperchild(self): + executor = self.executor_type(1, maxtasksperchild=1) + executor.submit(init, "init").result() + status = executor.submit(get_init_status).result() + self.assertEqual(status, "uninitialized") + executor.shutdown() + + create_executor_tests(ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, ProcessPoolForkserverMixin, diff --git a/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst new file mode 100644 index 00000000000000..e26afa489b70d5 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst @@ -0,0 +1,4 @@ +Add `maxtasksperchild` to `ProcessPoolExecutor`. This allows users to +specify the maximum number of tasks a single process should execute before +the process needs to be restarted. Mirrors functionality (and shares name) +with `multiprocessing.Pool` From 2f7c9fb4210e95e98627f5dbcdf8c72cfbd9bc6f Mon Sep 17 00:00:00 2001 From: Logan Asher Jones Date: Mon, 26 Jul 2021 13:38:41 -0400 Subject: [PATCH 2/6] bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor --- Doc/library/concurrent.futures.rst | 10 ++- Lib/concurrent/futures/process.py | 70 +++++++------------ Lib/test/test_concurrent_futures.py | 28 ++++++-- .../2021-07-26-13-33-37.bpo-44733.88LrP1.rst | 7 +- 4 files changed, 58 insertions(+), 57 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index ab58364260a241..b4213b451157e3 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter. Calling :class:`Executor` or :class:`Future` methods from a callable submitted to a :class:`ProcessPoolExecutor` will result in deadlock. -.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), maxtasksperchild=None) +.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None) An :class:`Executor` subclass that executes calls asynchronously using a pool of at most *max_workers* processes. If *max_workers* is ``None`` or not @@ -252,9 +252,9 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`, as well as any attempt to submit more jobs to the pool. - *maxtasksperchild* is an optional argument that specifies the maximum + *max_tasks_per_child* is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be - replaced with a fresh worker process. The default *maxtasksperchild* is + replaced with a fresh worker process. The default *max_tasks_per_child* is ``None`` which means worker processes will live as long as the pool. .. versionchanged:: 3.3 @@ -269,6 +269,10 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. Added the *initializer* and *initargs* arguments. + .. versionchanged:: 3.11 + The *max_tasks_per_child* argument was added to allow users to + control the lifetime of workers in the pool. + .. _processpoolexecutor-example: diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 95345060ac4f8e..ca225b6f14dc89 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -211,7 +211,7 @@ def _sendback_result(result_queue, work_id, result=None, exception=None): result_queue.put(_ResultItem(work_id, exception=exc)) -def _process_worker(call_queue, result_queue, initializer, initargs, maxtasksperchild=None): +def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. @@ -233,7 +233,7 @@ def _process_worker(call_queue, result_queue, initializer, initargs, maxtasksper # mark the pool broken return completed_count = 0 - while maxtasksperchild is None or (maxtasksperchild > completed_count): + while max_tasks is None or (max_tasks > completed_count): call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread @@ -308,18 +308,9 @@ def weakref_cb(_, # A queue.Queue of work ids e.g. Queue([5, 6, ...]). self.work_ids_queue = executor._work_ids - # A multiprocessing context to use when respawning processes - self.mp_context = executor._mp_context - - # Initializer for respawned processes - self.initializer = executor._initializer - - # Initializer args for respawned processes - self.initargs = executor._initargs - # Maximum number of tasks a worker process can execute before # exiting safely - self.maxtasksperchild = executor._maxtasksperchild + self.max_tasks_per_child = executor._max_tasks_per_child # A dict mapping work ids to _WorkItems e.g. # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} @@ -422,13 +413,12 @@ def process_result_item(self, result_item): # (avoids marking the executor broken) p = self.processes.pop(result_item) p.join() - if self.is_shutting_down(): + if self.is_shutting_down() and not self.pending_work_items: if not self.processes: self.join_executor_internals() - return else: - self._spawn_replacement_process() - return + if executor := self.executor_reference(): + executor._add_process_to_pool() else: # Received a _ResultItem so mark the future as completed. work_item = self.pending_work_items.pop(result_item.work_id, None) @@ -543,17 +533,6 @@ def get_n_children_alive(self): # This is an upper bound on the number of children alive. return sum(p.is_alive() for p in self.processes.values()) - def _spawn_replacement_process(self): - p = self.mp_context.Process( - target=_process_worker, - args=(self.call_queue, - self.result_queue, - self.initializer, - self.initargs, - self.maxtasksperchild)) - p.start() - self.processes[p.pid] = p - _system_limits_checked = False _system_limited = None @@ -612,7 +591,7 @@ class BrokenProcessPool(_base.BrokenExecutor): class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, - initializer=None, initargs=(), maxtasksperchild=None): + initializer=None, initargs=(), *, max_tasks_per_child=None): """Initializes a new ProcessPoolExecutor instance. Args: @@ -623,7 +602,7 @@ def __init__(self, max_workers=None, mp_context=None, object should provide SimpleQueue, Queue and Process. initializer: A callable used to initialize worker processes. initargs: A tuple of arguments to pass to the initializer. - maxtasksperchild: The maximum number of tasks a worker process can + max_tasks_per_child: The maximum number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default value is None, which means worker process will live @@ -655,12 +634,12 @@ def __init__(self, max_workers=None, mp_context=None, self._initializer = initializer self._initargs = initargs - if maxtasksperchild is not None: - if not isinstance(maxtasksperchild, int): - raise TypeError("maxtasksperchild must be an integer") - elif maxtasksperchild < 0: - raise ValueError("maxtasksperchild must be >= 1") - self._maxtasksperchild = maxtasksperchild + if max_tasks_per_child is not None: + if not isinstance(max_tasks_per_child, int): + raise TypeError("max_tasks_per_child must be an integer") + elif max_tasks_per_child <= 0: + raise ValueError("max_tasks_per_child must be >= 1") + self._max_tasks_per_child = max_tasks_per_child # Management thread self._executor_manager_thread = None @@ -719,15 +698,18 @@ def _adjust_process_count(self): process_count = len(self._processes) if process_count < self._max_workers: - p = self._mp_context.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._initializer, - self._initargs, - self._maxtasksperchild)) - p.start() - self._processes[p.pid] = p + self._add_process_to_pool() + + def _add_process_to_pool(self): + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs, + self._max_tasks_per_child)) + p.start() + self._processes[p.pid] = p def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index ef5ee1f09a9abb..9cc2c647eae0e6 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -49,7 +49,6 @@ def create_future(state=PENDING, exception=None, result=None): INITIALIZER_STATUS = 'uninitialized' - def mul(x, y): return x * y @@ -1038,12 +1037,29 @@ def test_idle_process_reuse_multiple(self): self.assertLessEqual(len(executor._processes), 2) executor.shutdown() - def test_maxtasksperchild(self): - executor = self.executor_type(1, maxtasksperchild=1) - executor.submit(init, "init").result() - status = executor.submit(get_init_status).result() - self.assertEqual(status, "uninitialized") + def test_max_tasks_per_child(self): + executor = self.executor_type(1, max_tasks_per_child=3) + f1 = executor.submit(os.getpid) + original_pid = f1.result() + + f2 = executor.submit(os.getpid) + self.assertEqual(f2.result(), original_pid) + f3 = executor.submit(os.getpid) + self.assertEqual(f3.result(), original_pid) + + f4 = executor.submit(os.getpid) + new_pid = f4.result() + self.assertNotEqual(original_pid, new_pid) + executor.shutdown() + + def test_max_tasks_early_shutdown(self): + executor = self.executor_type(3, max_tasks_per_child=1) + futures = [] + for i in range(6): + futures.append(executor.submit(mul, i, i)) executor.shutdown() + for i, future in enumerate(futures): + self.assertEqual(future.result(), mul(i, i)) create_executor_tests(ProcessPoolExecutorTest, diff --git a/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst index e26afa489b70d5..666b5f7d0a0351 100644 --- a/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst +++ b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst @@ -1,4 +1,3 @@ -Add `maxtasksperchild` to `ProcessPoolExecutor`. This allows users to -specify the maximum number of tasks a single process should execute before -the process needs to be restarted. Mirrors functionality (and shares name) -with `multiprocessing.Pool` +Add ``max_tasks_per_child`` to :class:`concurrent.futures.ProcessPoolExecutor`. +This allows users to specify the maximum number of tasks a single process +should execute before the process needs to be restarted. From 948a855615c724034317071b3eb71972ef179460 Mon Sep 17 00:00:00 2001 From: Logan Asher Jones Date: Sat, 30 Oct 2021 14:43:38 -0400 Subject: [PATCH 3/6] bpo-44733: max_tasks_per_child result item changes --- Lib/concurrent/futures/process.py | 78 +++++++++++++++++-------------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index ca225b6f14dc89..5b9cc00097e362 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -141,10 +141,11 @@ def __init__(self, future, fn, args, kwargs): self.kwargs = kwargs class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): + def __init__(self, work_id, exception=None, result=None, pid=None): self.work_id = work_id self.exception = exception self.result = result + self.pid = pid class _CallItem(object): def __init__(self, work_id, fn, args, kwargs): @@ -201,14 +202,14 @@ def _process_chunk(fn, chunk): return [fn(*args) for args in chunk] -def _sendback_result(result_queue, work_id, result=None, exception=None): +def _sendback_result(result_queue, work_id, result=None, exception=None, pid=None): """Safely send back the given result or exception""" try: result_queue.put(_ResultItem(work_id, result=result, - exception=exception)) + exception=exception, pid=pid)) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(work_id, exception=exc)) + result_queue.put(_ResultItem(work_id, exception=exc, pid=pid)) def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): @@ -232,31 +233,35 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N # The parent will notice that the process stopped and # mark the pool broken return - completed_count = 0 - while max_tasks is None or (max_tasks > completed_count): + num_tasks = 0 + exit_pid = None + while True: call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread result_queue.put(os.getpid()) return + + if max_tasks is not None: + num_tasks += 1 + if num_tasks >= max_tasks: + exit_pid = os.getpid() + try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - _sendback_result(result_queue, call_item.work_id, exception=exc) + _sendback_result(result_queue, call_item.work_id, exception=exc, pid=exit_pid) else: - _sendback_result(result_queue, call_item.work_id, result=r) + _sendback_result(result_queue, call_item.work_id, result=r, pid=exit_pid) del r - finally: - completed_count += 1 # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore del call_item - # Reached the maximum number of tasks this process can work on - # notify the manager that this process is exiting cleanly - result_queue.put(os.getpid()) + if exit_pid is not None: + return class _ExecutorManagerThread(threading.Thread): @@ -331,15 +336,21 @@ def run(self): return if result_item is not None: self.process_result_item(result_item) + + process_exited = result_item.pid is not None + if process_exited: + self.processes.pop(result_item.pid) + # Delete reference to result_item to avoid keeping references # while waiting on new results. del result_item - # attempt to increment idle process count - executor = self.executor_reference() - if executor is not None: - executor._idle_worker_semaphore.release() - del executor + if executor := self.executor_reference(): + if process_exited: + executor._adjust_process_count() + else: + executor._idle_worker_semaphore.release() + del executor if self.is_shutting_down(): self.flag_executor_shutting_down() @@ -411,14 +422,12 @@ def process_result_item(self, result_item): if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) + assert self.is_shutting_down() p = self.processes.pop(result_item) p.join() - if self.is_shutting_down() and not self.pending_work_items: - if not self.processes: - self.join_executor_internals() - else: - if executor := self.executor_reference(): - executor._add_process_to_pool() + if not self.processes: + self.join_executor_internals() + return else: # Received a _ResultItem so mark the future as completed. work_item = self.pending_work_items.pop(result_item.work_id, None) @@ -698,18 +707,15 @@ def _adjust_process_count(self): process_count = len(self._processes) if process_count < self._max_workers: - self._add_process_to_pool() - - def _add_process_to_pool(self): - p = self._mp_context.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._initializer, - self._initargs, - self._max_tasks_per_child)) - p.start() - self._processes[p.pid] = p + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs, + self._max_tasks_per_child)) + p.start() + self._processes[p.pid] = p def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: From a6495ff313f66b7f555792fbb35f7f8930df5fb8 Mon Sep 17 00:00:00 2001 From: Logan Asher Jones Date: Sat, 30 Oct 2021 20:04:12 -0400 Subject: [PATCH 4/6] bpo-44733: Ensuring process is cleaned up --- Lib/concurrent/futures/process.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 5b9cc00097e362..e45511ffe8e155 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -339,7 +339,8 @@ def run(self): process_exited = result_item.pid is not None if process_exited: - self.processes.pop(result_item.pid) + p = self.processes.pop(result_item.pid) + p.join() # Delete reference to result_item to avoid keeping references # while waiting on new results. From e4cbc3ad60fef65634c1d1cdd981bce074603596 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 20 Nov 2021 16:39:28 +0100 Subject: [PATCH 5/6] * Process count adjustment must be serialized * Add test for number of worker processes --- Lib/concurrent/futures/process.py | 25 +++++++++++++++---------- Lib/test/test_concurrent_futures.py | 11 ++++++++++- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index e45511ffe8e155..19e93a608b2769 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -141,11 +141,11 @@ def __init__(self, future, fn, args, kwargs): self.kwargs = kwargs class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None, pid=None): + def __init__(self, work_id, exception=None, result=None, exit_pid=None): self.work_id = work_id self.exception = exception self.result = result - self.pid = pid + self.exit_pid = exit_pid class _CallItem(object): def __init__(self, work_id, fn, args, kwargs): @@ -202,14 +202,16 @@ def _process_chunk(fn, chunk): return [fn(*args) for args in chunk] -def _sendback_result(result_queue, work_id, result=None, exception=None, pid=None): +def _sendback_result(result_queue, work_id, result=None, exception=None, + exit_pid=None): """Safely send back the given result or exception""" try: result_queue.put(_ResultItem(work_id, result=result, - exception=exception, pid=pid)) + exception=exception, exit_pid=exit_pid)) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - result_queue.put(_ResultItem(work_id, exception=exc, pid=pid)) + result_queue.put(_ResultItem(work_id, exception=exc, + exit_pid=exit_pid)) def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): @@ -251,9 +253,11 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: exc = _ExceptionWithTraceback(e, e.__traceback__) - _sendback_result(result_queue, call_item.work_id, exception=exc, pid=exit_pid) + _sendback_result(result_queue, call_item.work_id, exception=exc, + exit_pid=exit_pid) else: - _sendback_result(result_queue, call_item.work_id, result=r, pid=exit_pid) + _sendback_result(result_queue, call_item.work_id, result=r, + exit_pid=exit_pid) del r # Liberate the resource as soon as possible, to avoid holding onto @@ -337,9 +341,9 @@ def run(self): if result_item is not None: self.process_result_item(result_item) - process_exited = result_item.pid is not None + process_exited = result_item.exit_pid is not None if process_exited: - p = self.processes.pop(result_item.pid) + p = self.processes.pop(result_item.exit_pid) p.join() # Delete reference to result_item to avoid keeping references @@ -348,7 +352,8 @@ def run(self): if executor := self.executor_reference(): if process_exited: - executor._adjust_process_count() + with self.shutdown_lock: + executor._adjust_process_count() else: executor._idle_worker_semaphore.release() del executor diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 9cc2c647eae0e6..fa452593c9bf44 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1041,15 +1041,24 @@ def test_max_tasks_per_child(self): executor = self.executor_type(1, max_tasks_per_child=3) f1 = executor.submit(os.getpid) original_pid = f1.result() - + # The worker pid remains the same as the worker could be reused f2 = executor.submit(os.getpid) self.assertEqual(f2.result(), original_pid) + self.assertEqual(len(executor._processes), 1) f3 = executor.submit(os.getpid) self.assertEqual(f3.result(), original_pid) + # The worker reached end-of-life and is eventually reaped + t1 = time.monotonic() + while len(executor._processes) > 0: + self.assertLess(time.monotonic() - t1, 10.0) + time.sleep(0.1) + # A new worker is spawned, with a statistically different pid f4 = executor.submit(os.getpid) new_pid = f4.result() self.assertNotEqual(original_pid, new_pid) + self.assertEqual(len(executor._processes), 1) + executor.shutdown() def test_max_tasks_early_shutdown(self): From cfeefb8d96398428b65e0edc2ce508847237f173 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Sat, 20 Nov 2021 18:36:21 +0100 Subject: [PATCH 6/6] Remove flaky check --- Lib/test/test_concurrent_futures.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index fa452593c9bf44..bbb6aa1eef81f7 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -1047,13 +1047,9 @@ def test_max_tasks_per_child(self): self.assertEqual(len(executor._processes), 1) f3 = executor.submit(os.getpid) self.assertEqual(f3.result(), original_pid) - # The worker reached end-of-life and is eventually reaped - t1 = time.monotonic() - while len(executor._processes) > 0: - self.assertLess(time.monotonic() - t1, 10.0) - time.sleep(0.1) - # A new worker is spawned, with a statistically different pid + # A new worker is spawned, with a statistically different pid, + # while the previous was reaped. f4 = executor.submit(os.getpid) new_pid = f4.result() self.assertNotEqual(original_pid, new_pid)