Skip to content

Commit 948a855

Browse files
bpo-44733: max_tasks_per_child result item changes
1 parent 2f7c9fb commit 948a855

File tree

1 file changed

+42
-36
lines changed

1 file changed

+42
-36
lines changed

Lib/concurrent/futures/process.py

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,11 @@ def __init__(self, future, fn, args, kwargs):
141141
self.kwargs = kwargs
142142

143143
class _ResultItem(object):
144-
def __init__(self, work_id, exception=None, result=None):
144+
def __init__(self, work_id, exception=None, result=None, pid=None):
145145
self.work_id = work_id
146146
self.exception = exception
147147
self.result = result
148+
self.pid = pid
148149

149150
class _CallItem(object):
150151
def __init__(self, work_id, fn, args, kwargs):
@@ -201,14 +202,14 @@ def _process_chunk(fn, chunk):
201202
return [fn(*args) for args in chunk]
202203

203204

204-
def _sendback_result(result_queue, work_id, result=None, exception=None):
205+
def _sendback_result(result_queue, work_id, result=None, exception=None, pid=None):
205206
"""Safely send back the given result or exception"""
206207
try:
207208
result_queue.put(_ResultItem(work_id, result=result,
208-
exception=exception))
209+
exception=exception, pid=pid))
209210
except BaseException as e:
210211
exc = _ExceptionWithTraceback(e, e.__traceback__)
211-
result_queue.put(_ResultItem(work_id, exception=exc))
212+
result_queue.put(_ResultItem(work_id, exception=exc, pid=pid))
212213

213214

214215
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
@@ -232,31 +233,35 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N
232233
# The parent will notice that the process stopped and
233234
# mark the pool broken
234235
return
235-
completed_count = 0
236-
while max_tasks is None or (max_tasks > completed_count):
236+
num_tasks = 0
237+
exit_pid = None
238+
while True:
237239
call_item = call_queue.get(block=True)
238240
if call_item is None:
239241
# Wake up queue management thread
240242
result_queue.put(os.getpid())
241243
return
244+
245+
if max_tasks is not None:
246+
num_tasks += 1
247+
if num_tasks >= max_tasks:
248+
exit_pid = os.getpid()
249+
242250
try:
243251
r = call_item.fn(*call_item.args, **call_item.kwargs)
244252
except BaseException as e:
245253
exc = _ExceptionWithTraceback(e, e.__traceback__)
246-
_sendback_result(result_queue, call_item.work_id, exception=exc)
254+
_sendback_result(result_queue, call_item.work_id, exception=exc, pid=exit_pid)
247255
else:
248-
_sendback_result(result_queue, call_item.work_id, result=r)
256+
_sendback_result(result_queue, call_item.work_id, result=r, pid=exit_pid)
249257
del r
250-
finally:
251-
completed_count += 1
252258

253259
# Liberate the resource as soon as possible, to avoid holding onto
254260
# open files or shared memory that is not needed anymore
255261
del call_item
256262

257-
# Reached the maximum number of tasks this process can work on
258-
# notify the manager that this process is exiting cleanly
259-
result_queue.put(os.getpid())
263+
if exit_pid is not None:
264+
return
260265

261266

262267
class _ExecutorManagerThread(threading.Thread):
@@ -331,15 +336,21 @@ def run(self):
331336
return
332337
if result_item is not None:
333338
self.process_result_item(result_item)
339+
340+
process_exited = result_item.pid is not None
341+
if process_exited:
342+
self.processes.pop(result_item.pid)
343+
334344
# Delete reference to result_item to avoid keeping references
335345
# while waiting on new results.
336346
del result_item
337347

338-
# attempt to increment idle process count
339-
executor = self.executor_reference()
340-
if executor is not None:
341-
executor._idle_worker_semaphore.release()
342-
del executor
348+
if executor := self.executor_reference():
349+
if process_exited:
350+
executor._adjust_process_count()
351+
else:
352+
executor._idle_worker_semaphore.release()
353+
del executor
343354

344355
if self.is_shutting_down():
345356
self.flag_executor_shutting_down()
@@ -411,14 +422,12 @@ def process_result_item(self, result_item):
411422
if isinstance(result_item, int):
412423
# Clean shutdown of a worker using its PID
413424
# (avoids marking the executor broken)
425+
assert self.is_shutting_down()
414426
p = self.processes.pop(result_item)
415427
p.join()
416-
if self.is_shutting_down() and not self.pending_work_items:
417-
if not self.processes:
418-
self.join_executor_internals()
419-
else:
420-
if executor := self.executor_reference():
421-
executor._add_process_to_pool()
428+
if not self.processes:
429+
self.join_executor_internals()
430+
return
422431
else:
423432
# Received a _ResultItem so mark the future as completed.
424433
work_item = self.pending_work_items.pop(result_item.work_id, None)
@@ -698,18 +707,15 @@ def _adjust_process_count(self):
698707

699708
process_count = len(self._processes)
700709
if process_count < self._max_workers:
701-
self._add_process_to_pool()
702-
703-
def _add_process_to_pool(self):
704-
p = self._mp_context.Process(
705-
target=_process_worker,
706-
args=(self._call_queue,
707-
self._result_queue,
708-
self._initializer,
709-
self._initargs,
710-
self._max_tasks_per_child))
711-
p.start()
712-
self._processes[p.pid] = p
710+
p = self._mp_context.Process(
711+
target=_process_worker,
712+
args=(self._call_queue,
713+
self._result_queue,
714+
self._initializer,
715+
self._initargs,
716+
self._max_tasks_per_child))
717+
p.start()
718+
self._processes[p.pid] = p
713719

714720
def submit(self, fn, /, *args, **kwargs):
715721
with self._shutdown_lock:

0 commit comments

Comments
 (0)