Skip to content

Commit fdc0e09

Browse files
bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)
Co-authored-by: Antoine Pitrou <[email protected]>
1 parent 123a352 commit fdc0e09

File tree

4 files changed

+98
-16
lines changed

4 files changed

+98
-16
lines changed

Doc/library/concurrent.futures.rst

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
231231
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
232232
to a :class:`ProcessPoolExecutor` will result in deadlock.
233233

234-
.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
234+
.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
235235

236236
An :class:`Executor` subclass that executes calls asynchronously using a pool
237237
of at most *max_workers* processes. If *max_workers* is ``None`` or not
@@ -252,6 +252,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
252252
pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
253253
as well as any attempt to submit more jobs to the pool.
254254

255+
*max_tasks_per_child* is an optional argument that specifies the maximum
256+
number of tasks a single process can execute before it will exit and be
257+
replaced with a fresh worker process. The default *max_tasks_per_child* is
258+
``None`` which means worker processes will live as long as the pool.
259+
255260
.. versionchanged:: 3.3
256261
When one of the worker processes terminates abruptly, a
257262
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour
@@ -264,6 +269,10 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
264269

265270
Added the *initializer* and *initargs* arguments.
266271

272+
.. versionchanged:: 3.11
273+
The *max_tasks_per_child* argument was added to allow users to
274+
control the lifetime of workers in the pool.
275+
267276

268277
.. _processpoolexecutor-example:
269278

Lib/concurrent/futures/process.py

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,11 @@ def __init__(self, future, fn, args, kwargs):
141141
self.kwargs = kwargs
142142

143143
class _ResultItem(object):
144-
def __init__(self, work_id, exception=None, result=None):
144+
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
145145
self.work_id = work_id
146146
self.exception = exception
147147
self.result = result
148+
self.exit_pid = exit_pid
148149

149150
class _CallItem(object):
150151
def __init__(self, work_id, fn, args, kwargs):
@@ -201,17 +202,19 @@ def _process_chunk(fn, chunk):
201202
return [fn(*args) for args in chunk]
202203

203204

204-
def _sendback_result(result_queue, work_id, result=None, exception=None):
205+
def _sendback_result(result_queue, work_id, result=None, exception=None,
206+
exit_pid=None):
205207
"""Safely send back the given result or exception"""
206208
try:
207209
result_queue.put(_ResultItem(work_id, result=result,
208-
exception=exception))
210+
exception=exception, exit_pid=exit_pid))
209211
except BaseException as e:
210212
exc = _ExceptionWithTraceback(e, e.__traceback__)
211-
result_queue.put(_ResultItem(work_id, exception=exc))
213+
result_queue.put(_ResultItem(work_id, exception=exc,
214+
exit_pid=exit_pid))
212215

213216

214-
def _process_worker(call_queue, result_queue, initializer, initargs):
217+
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
215218
"""Evaluates calls from call_queue and places the results in result_queue.
216219
217220
This worker is run in a separate process.
@@ -232,25 +235,38 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
232235
# The parent will notice that the process stopped and
233236
# mark the pool broken
234237
return
238+
num_tasks = 0
239+
exit_pid = None
235240
while True:
236241
call_item = call_queue.get(block=True)
237242
if call_item is None:
238243
# Wake up queue management thread
239244
result_queue.put(os.getpid())
240245
return
246+
247+
if max_tasks is not None:
248+
num_tasks += 1
249+
if num_tasks >= max_tasks:
250+
exit_pid = os.getpid()
251+
241252
try:
242253
r = call_item.fn(*call_item.args, **call_item.kwargs)
243254
except BaseException as e:
244255
exc = _ExceptionWithTraceback(e, e.__traceback__)
245-
_sendback_result(result_queue, call_item.work_id, exception=exc)
256+
_sendback_result(result_queue, call_item.work_id, exception=exc,
257+
exit_pid=exit_pid)
246258
else:
247-
_sendback_result(result_queue, call_item.work_id, result=r)
259+
_sendback_result(result_queue, call_item.work_id, result=r,
260+
exit_pid=exit_pid)
248261
del r
249262

250263
# Liberate the resource as soon as possible, to avoid holding onto
251264
# open files or shared memory that is not needed anymore
252265
del call_item
253266

267+
if exit_pid is not None:
268+
return
269+
254270

255271
class _ExecutorManagerThread(threading.Thread):
256272
"""Manages the communication between this process and the worker processes.
@@ -301,6 +317,10 @@ def weakref_cb(_,
301317
# A queue.Queue of work ids e.g. Queue([5, 6, ...]).
302318
self.work_ids_queue = executor._work_ids
303319

320+
# Maximum number of tasks a worker process can execute before
321+
# exiting safely
322+
self.max_tasks_per_child = executor._max_tasks_per_child
323+
304324
# A dict mapping work ids to _WorkItems e.g.
305325
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
306326
self.pending_work_items = executor._pending_work_items
@@ -320,15 +340,23 @@ def run(self):
320340
return
321341
if result_item is not None:
322342
self.process_result_item(result_item)
343+
344+
process_exited = result_item.exit_pid is not None
345+
if process_exited:
346+
p = self.processes.pop(result_item.exit_pid)
347+
p.join()
348+
323349
# Delete reference to result_item to avoid keeping references
324350
# while waiting on new results.
325351
del result_item
326352

327-
# attempt to increment idle process count
328-
executor = self.executor_reference()
329-
if executor is not None:
330-
executor._idle_worker_semaphore.release()
331-
del executor
353+
if executor := self.executor_reference():
354+
if process_exited:
355+
with self.shutdown_lock:
356+
executor._adjust_process_count()
357+
else:
358+
executor._idle_worker_semaphore.release()
359+
del executor
332360

333361
if self.is_shutting_down():
334362
self.flag_executor_shutting_down()
@@ -578,7 +606,7 @@ class BrokenProcessPool(_base.BrokenExecutor):
578606

579607
class ProcessPoolExecutor(_base.Executor):
580608
def __init__(self, max_workers=None, mp_context=None,
581-
initializer=None, initargs=()):
609+
initializer=None, initargs=(), *, max_tasks_per_child=None):
582610
"""Initializes a new ProcessPoolExecutor instance.
583611
584612
Args:
@@ -589,6 +617,11 @@ def __init__(self, max_workers=None, mp_context=None,
589617
object should provide SimpleQueue, Queue and Process.
590618
initializer: A callable used to initialize worker processes.
591619
initargs: A tuple of arguments to pass to the initializer.
620+
max_tasks_per_child: The maximum number of tasks a worker process can
621+
complete before it will exit and be replaced with a fresh
622+
worker process, to enable unused resources to be freed. The
623+
default value is None, which means worker process will live
624+
as long as the executor will live.
592625
"""
593626
_check_system_limits()
594627

@@ -616,6 +649,13 @@ def __init__(self, max_workers=None, mp_context=None,
616649
self._initializer = initializer
617650
self._initargs = initargs
618651

652+
if max_tasks_per_child is not None:
653+
if not isinstance(max_tasks_per_child, int):
654+
raise TypeError("max_tasks_per_child must be an integer")
655+
elif max_tasks_per_child <= 0:
656+
raise ValueError("max_tasks_per_child must be >= 1")
657+
self._max_tasks_per_child = max_tasks_per_child
658+
619659
# Management thread
620660
self._executor_manager_thread = None
621661

@@ -678,7 +718,8 @@ def _adjust_process_count(self):
678718
args=(self._call_queue,
679719
self._result_queue,
680720
self._initializer,
681-
self._initargs))
721+
self._initargs,
722+
self._max_tasks_per_child))
682723
p.start()
683724
self._processes[p.pid] = p
684725

Lib/test/test_concurrent_futures.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ def create_future(state=PENDING, exception=None, result=None):
4949

5050
INITIALIZER_STATUS = 'uninitialized'
5151

52-
5352
def mul(x, y):
5453
return x * y
5554

@@ -1038,6 +1037,36 @@ def test_idle_process_reuse_multiple(self):
10381037
self.assertLessEqual(len(executor._processes), 2)
10391038
executor.shutdown()
10401039

1040+
def test_max_tasks_per_child(self):
1041+
executor = self.executor_type(1, max_tasks_per_child=3)
1042+
f1 = executor.submit(os.getpid)
1043+
original_pid = f1.result()
1044+
# The worker pid remains the same as the worker could be reused
1045+
f2 = executor.submit(os.getpid)
1046+
self.assertEqual(f2.result(), original_pid)
1047+
self.assertEqual(len(executor._processes), 1)
1048+
f3 = executor.submit(os.getpid)
1049+
self.assertEqual(f3.result(), original_pid)
1050+
1051+
# A new worker is spawned, with a statistically different pid,
1052+
# while the previous was reaped.
1053+
f4 = executor.submit(os.getpid)
1054+
new_pid = f4.result()
1055+
self.assertNotEqual(original_pid, new_pid)
1056+
self.assertEqual(len(executor._processes), 1)
1057+
1058+
executor.shutdown()
1059+
1060+
def test_max_tasks_early_shutdown(self):
1061+
executor = self.executor_type(3, max_tasks_per_child=1)
1062+
futures = []
1063+
for i in range(6):
1064+
futures.append(executor.submit(mul, i, i))
1065+
executor.shutdown()
1066+
for i, future in enumerate(futures):
1067+
self.assertEqual(future.result(), mul(i, i))
1068+
1069+
10411070
create_executor_tests(ProcessPoolExecutorTest,
10421071
executor_mixins=(ProcessPoolForkMixin,
10431072
ProcessPoolForkserverMixin,
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add ``max_tasks_per_child`` to :class:`concurrent.futures.ProcessPoolExecutor`.
2+
This allows users to specify the maximum number of tasks a single process
3+
should execute before the process needs to be restarted.

0 commit comments

Comments
 (0)