Skip to content

Commit e4cbc3a

Browse files
committed
* Process count adjustment must be serialized
* Add test for number of worker processes
1 parent 52ea6ff commit e4cbc3a

File tree

2 files changed

+25
-11
lines changed

2 files changed

+25
-11
lines changed

Lib/concurrent/futures/process.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ def __init__(self, future, fn, args, kwargs):
141141
self.kwargs = kwargs
142142

143143
class _ResultItem(object):
144-
def __init__(self, work_id, exception=None, result=None, pid=None):
144+
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
145145
self.work_id = work_id
146146
self.exception = exception
147147
self.result = result
148-
self.pid = pid
148+
self.exit_pid = exit_pid
149149

150150
class _CallItem(object):
151151
def __init__(self, work_id, fn, args, kwargs):
@@ -202,14 +202,16 @@ def _process_chunk(fn, chunk):
202202
return [fn(*args) for args in chunk]
203203

204204

205-
def _sendback_result(result_queue, work_id, result=None, exception=None, pid=None):
205+
def _sendback_result(result_queue, work_id, result=None, exception=None,
206+
exit_pid=None):
206207
"""Safely send back the given result or exception"""
207208
try:
208209
result_queue.put(_ResultItem(work_id, result=result,
209-
exception=exception, pid=pid))
210+
exception=exception, exit_pid=exit_pid))
210211
except BaseException as e:
211212
exc = _ExceptionWithTraceback(e, e.__traceback__)
212-
result_queue.put(_ResultItem(work_id, exception=exc, pid=pid))
213+
result_queue.put(_ResultItem(work_id, exception=exc,
214+
exit_pid=exit_pid))
213215

214216

215217
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
@@ -251,9 +253,11 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N
251253
r = call_item.fn(*call_item.args, **call_item.kwargs)
252254
except BaseException as e:
253255
exc = _ExceptionWithTraceback(e, e.__traceback__)
254-
_sendback_result(result_queue, call_item.work_id, exception=exc, pid=exit_pid)
256+
_sendback_result(result_queue, call_item.work_id, exception=exc,
257+
exit_pid=exit_pid)
255258
else:
256-
_sendback_result(result_queue, call_item.work_id, result=r, pid=exit_pid)
259+
_sendback_result(result_queue, call_item.work_id, result=r,
260+
exit_pid=exit_pid)
257261
del r
258262

259263
# Liberate the resource as soon as possible, to avoid holding onto
@@ -337,9 +341,9 @@ def run(self):
337341
if result_item is not None:
338342
self.process_result_item(result_item)
339343

340-
process_exited = result_item.pid is not None
344+
process_exited = result_item.exit_pid is not None
341345
if process_exited:
342-
p = self.processes.pop(result_item.pid)
346+
p = self.processes.pop(result_item.exit_pid)
343347
p.join()
344348

345349
# Delete reference to result_item to avoid keeping references
@@ -348,7 +352,8 @@ def run(self):
348352

349353
if executor := self.executor_reference():
350354
if process_exited:
351-
executor._adjust_process_count()
355+
with self.shutdown_lock:
356+
executor._adjust_process_count()
352357
else:
353358
executor._idle_worker_semaphore.release()
354359
del executor

Lib/test/test_concurrent_futures.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1041,15 +1041,24 @@ def test_max_tasks_per_child(self):
10411041
executor = self.executor_type(1, max_tasks_per_child=3)
10421042
f1 = executor.submit(os.getpid)
10431043
original_pid = f1.result()
1044-
1044+
# The worker pid remains the same as the worker could be reused
10451045
f2 = executor.submit(os.getpid)
10461046
self.assertEqual(f2.result(), original_pid)
1047+
self.assertEqual(len(executor._processes), 1)
10471048
f3 = executor.submit(os.getpid)
10481049
self.assertEqual(f3.result(), original_pid)
1050+
# The worker reached end-of-life and is eventually reaped
1051+
t1 = time.monotonic()
1052+
while len(executor._processes) > 0:
1053+
self.assertLess(time.monotonic() - t1, 10.0)
1054+
time.sleep(0.1)
10491055

1056+
# A new worker is spawned, with a statistically different pid
10501057
f4 = executor.submit(os.getpid)
10511058
new_pid = f4.result()
10521059
self.assertNotEqual(original_pid, new_pid)
1060+
self.assertEqual(len(executor._processes), 1)
1061+
10531062
executor.shutdown()
10541063

10551064
def test_max_tasks_early_shutdown(self):

0 commit comments

Comments
 (0)