Skip to content

bpo-44733: Add maxtasksperchild to ProcessPoolExecutor #27373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

An :class:`Executor` subclass that executes calls asynchronously using a pool
of at most *max_workers* processes. If *max_workers* is ``None`` or not
Expand All @@ -252,6 +252,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
as well as any attempt to submit more jobs to the pool.

*max_tasks_per_child* is an optional argument that specifies the maximum
number of tasks a single process can execute before it will exit and be
replaced with a fresh worker process. The default *max_tasks_per_child* is
``None`` which means worker processes will live as long as the pool.

.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour
Expand All @@ -264,6 +269,10 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.

Added the *initializer* and *initargs* arguments.

.. versionchanged:: 3.11
The *max_tasks_per_child* argument was added to allow users to
control the lifetime of workers in the pool.


.. _processpoolexecutor-example:

Expand Down
69 changes: 55 additions & 14 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ def __init__(self, future, fn, args, kwargs):
self.kwargs = kwargs

class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id
self.exception = exception
self.result = result
self.exit_pid = exit_pid

class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
Expand Down Expand Up @@ -201,17 +202,19 @@ def _process_chunk(fn, chunk):
return [fn(*args) for args in chunk]


def _sendback_result(result_queue, work_id, result=None, exception=None):
def _sendback_result(result_queue, work_id, result=None, exception=None,
exit_pid=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception))
exception=exception, exit_pid=exit_pid))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc))
result_queue.put(_ResultItem(work_id, exception=exc,
exit_pid=exit_pid))


def _process_worker(call_queue, result_queue, initializer, initargs):
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
"""Evaluates calls from call_queue and places the results in result_queue.

This worker is run in a separate process.
Expand All @@ -232,25 +235,38 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
# The parent will notice that the process stopped and
# mark the pool broken
return
num_tasks = 0
exit_pid = None
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return

if max_tasks is not None:
num_tasks += 1
if num_tasks >= max_tasks:
exit_pid = os.getpid()

try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id, exception=exc)
_sendback_result(result_queue, call_item.work_id, exception=exc,
exit_pid=exit_pid)
else:
_sendback_result(result_queue, call_item.work_id, result=r)
_sendback_result(result_queue, call_item.work_id, result=r,
exit_pid=exit_pid)
del r

# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
del call_item

if exit_pid is not None:
return


class _ExecutorManagerThread(threading.Thread):
"""Manages the communication between this process and the worker processes.
Expand Down Expand Up @@ -301,6 +317,10 @@ def weakref_cb(_,
# A queue.Queue of work ids e.g. Queue([5, 6, ...]).
self.work_ids_queue = executor._work_ids

# Maximum number of tasks a worker process can execute before
# exiting safely
self.max_tasks_per_child = executor._max_tasks_per_child

# A dict mapping work ids to _WorkItems e.g.
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items
Expand All @@ -320,15 +340,23 @@ def run(self):
return
if result_item is not None:
self.process_result_item(result_item)

process_exited = result_item.exit_pid is not None
if process_exited:
p = self.processes.pop(result_item.exit_pid)
p.join()

# Delete reference to result_item to avoid keeping references
# while waiting on new results.
del result_item

# attempt to increment idle process count
executor = self.executor_reference()
if executor is not None:
executor._idle_worker_semaphore.release()
del executor
if executor := self.executor_reference():
if process_exited:
with self.shutdown_lock:
executor._adjust_process_count()
else:
executor._idle_worker_semaphore.release()
del executor

if self.is_shutting_down():
self.flag_executor_shutting_down()
Expand Down Expand Up @@ -578,7 +606,7 @@ class BrokenProcessPool(_base.BrokenExecutor):

class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=()):
initializer=None, initargs=(), *, max_tasks_per_child=None):
"""Initializes a new ProcessPoolExecutor instance.

Args:
Expand All @@ -589,6 +617,11 @@ def __init__(self, max_workers=None, mp_context=None,
object should provide SimpleQueue, Queue and Process.
initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
max_tasks_per_child: The maximum number of tasks a worker process can
complete before it will exit and be replaced with a fresh
worker process, to enable unused resources to be freed. The
default value is None, which means worker process will live
as long as the executor will live.
"""
_check_system_limits()

Expand Down Expand Up @@ -616,6 +649,13 @@ def __init__(self, max_workers=None, mp_context=None,
self._initializer = initializer
self._initargs = initargs

if max_tasks_per_child is not None:
if not isinstance(max_tasks_per_child, int):
raise TypeError("max_tasks_per_child must be an integer")
elif max_tasks_per_child <= 0:
raise ValueError("max_tasks_per_child must be >= 1")
self._max_tasks_per_child = max_tasks_per_child

# Management thread
self._executor_manager_thread = None

Expand Down Expand Up @@ -678,7 +718,8 @@ def _adjust_process_count(self):
args=(self._call_queue,
self._result_queue,
self._initializer,
self._initargs))
self._initargs,
self._max_tasks_per_child))
p.start()
self._processes[p.pid] = p

Expand Down
31 changes: 30 additions & 1 deletion Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def create_future(state=PENDING, exception=None, result=None):

INITIALIZER_STATUS = 'uninitialized'


def mul(x, y):
return x * y

Expand Down Expand Up @@ -1038,6 +1037,36 @@ def test_idle_process_reuse_multiple(self):
self.assertLessEqual(len(executor._processes), 2)
executor.shutdown()

def test_max_tasks_per_child(self):
executor = self.executor_type(1, max_tasks_per_child=3)
f1 = executor.submit(os.getpid)
original_pid = f1.result()
# The worker pid remains the same as the worker could be reused
f2 = executor.submit(os.getpid)
self.assertEqual(f2.result(), original_pid)
self.assertEqual(len(executor._processes), 1)
f3 = executor.submit(os.getpid)
self.assertEqual(f3.result(), original_pid)

# A new worker is spawned, with a statistically different pid,
# while the previous was reaped.
f4 = executor.submit(os.getpid)
new_pid = f4.result()
self.assertNotEqual(original_pid, new_pid)
self.assertEqual(len(executor._processes), 1)

executor.shutdown()

def test_max_tasks_early_shutdown(self):
executor = self.executor_type(3, max_tasks_per_child=1)
futures = []
for i in range(6):
futures.append(executor.submit(mul, i, i))
executor.shutdown()
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))


create_executor_tests(ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add ``max_tasks_per_child`` to :class:`concurrent.futures.ProcessPoolExecutor`.
This allows users to specify the maximum number of tasks a single process
should execute before the process needs to be restarted.