Skip to content

gh-90622: Prevent concurrent.futures.process deadlock due to fork() after spawning a thread. #30847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

An :class:`Executor` subclass that executes calls asynchronously using a pool
of at most *max_workers* processes. If *max_workers* is ``None`` or not
Expand All @@ -252,11 +252,6 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
as well as any attempt to submit more jobs to the pool.

*max_tasks_per_child* is an optional argument that specifies the maximum
number of tasks a single process can execute before it will exit and be
replaced with a fresh worker process. The default *max_tasks_per_child* is
``None`` which means worker processes will live as long as the pool.

.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour
Expand All @@ -269,10 +264,6 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.

Added the *initializer* and *initargs* arguments.

.. versionchanged:: 3.11
The *max_tasks_per_child* argument was added to allow users to
control the lifetime of workers in the pool.


.. _processpoolexecutor-example:

Expand Down
2 changes: 2 additions & 0 deletions Doc/whatsnew/3.9.rst
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ Workers in :class:`~concurrent.futures.ProcessPoolExecutor` are now spawned on
demand, only when there are no available idle workers to reuse. This optimizes
startup overhead and reduces the amount of lost CPU time to idle workers.
(Contributed by Kyle Stanley in :issue:`39207`.)
Update: This was reverted in 3.9.11, 3.10.3, and 3.11 as the implementation
could lead to deadlocks. See :issue:`46464`.

curses
------
Expand Down
82 changes: 18 additions & 64 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,10 @@ def __init__(self, future, fn, args, kwargs):
self.kwargs = kwargs

class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
def __init__(self, work_id, exception=None, result=None):
self.work_id = work_id
self.exception = exception
self.result = result
self.exit_pid = exit_pid

class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
Expand Down Expand Up @@ -202,19 +201,17 @@ def _process_chunk(fn, chunk):
return [fn(*args) for args in chunk]


def _sendback_result(result_queue, work_id, result=None, exception=None,
exit_pid=None):
def _sendback_result(result_queue, work_id, result=None, exception=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
exception=exception, exit_pid=exit_pid))
exception=exception))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc,
exit_pid=exit_pid))
result_queue.put(_ResultItem(work_id, exception=exc))


def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
def _process_worker(call_queue, result_queue, initializer, initargs):
"""Evaluates calls from call_queue and places the results in result_queue.

This worker is run in a separate process.
Expand All @@ -235,38 +232,25 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N
# The parent will notice that the process stopped and
# mark the pool broken
return
num_tasks = 0
exit_pid = None
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return

if max_tasks is not None:
num_tasks += 1
if num_tasks >= max_tasks:
exit_pid = os.getpid()

try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id, exception=exc,
exit_pid=exit_pid)
_sendback_result(result_queue, call_item.work_id, exception=exc)
else:
_sendback_result(result_queue, call_item.work_id, result=r,
exit_pid=exit_pid)
_sendback_result(result_queue, call_item.work_id, result=r)
del r

# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
del call_item

if exit_pid is not None:
return


class _ExecutorManagerThread(threading.Thread):
"""Manages the communication between this process and the worker processes.
Expand Down Expand Up @@ -317,10 +301,6 @@ def weakref_cb(_,
# A queue.Queue of work ids e.g. Queue([5, 6, ...]).
self.work_ids_queue = executor._work_ids

# Maximum number of tasks a worker process can execute before
# exiting safely
self.max_tasks_per_child = executor._max_tasks_per_child

# A dict mapping work ids to _WorkItems e.g.
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items
Expand All @@ -340,24 +320,10 @@ def run(self):
return
if result_item is not None:
self.process_result_item(result_item)

process_exited = result_item.exit_pid is not None
if process_exited:
p = self.processes.pop(result_item.exit_pid)
p.join()

# Delete reference to result_item to avoid keeping references
# while waiting on new results.
del result_item

if executor := self.executor_reference():
if process_exited:
with self.shutdown_lock:
executor._adjust_process_count()
else:
executor._idle_worker_semaphore.release()
del executor

if self.is_shutting_down():
self.flag_executor_shutting_down()

Expand Down Expand Up @@ -606,7 +572,7 @@ class BrokenProcessPool(_base.BrokenExecutor):

class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), *, max_tasks_per_child=None):
initializer=None, initargs=()):
"""Initializes a new ProcessPoolExecutor instance.

Args:
Expand All @@ -617,11 +583,6 @@ def __init__(self, max_workers=None, mp_context=None,
object should provide SimpleQueue, Queue and Process.
initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
max_tasks_per_child: The maximum number of tasks a worker process can
complete before it will exit and be replaced with a fresh
worker process, to enable unused resources to be freed. The
default value is None, which means worker process will live
as long as the executor will live.
"""
_check_system_limits()

Expand Down Expand Up @@ -649,13 +610,6 @@ def __init__(self, max_workers=None, mp_context=None,
self._initializer = initializer
self._initargs = initargs

if max_tasks_per_child is not None:
if not isinstance(max_tasks_per_child, int):
raise TypeError("max_tasks_per_child must be an integer")
elif max_tasks_per_child <= 0:
raise ValueError("max_tasks_per_child must be >= 1")
self._max_tasks_per_child = max_tasks_per_child

# Management thread
self._executor_manager_thread = None

Expand All @@ -665,7 +619,6 @@ def __init__(self, max_workers=None, mp_context=None,
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._idle_worker_semaphore = threading.Semaphore(0)
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
Expand Down Expand Up @@ -701,25 +654,27 @@ def __init__(self, max_workers=None, mp_context=None,
def _start_executor_manager_thread(self):
if self._executor_manager_thread is None:
# Start the processes so that their sentinels are known.
self._adjust_process_count()
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup

def _adjust_process_count(self):
# if there's an idle process, we don't need to spawn a new one.
if self._idle_worker_semaphore.acquire(blocking=False):
return

process_count = len(self._processes)
if process_count < self._max_workers:
# To get rid of this condition don't fork() from this process.
# This applies to _any_ thread existing in the process at all, but
# that is a long standing issue. We at least make sure this library
# is not the cause of its own deadlocks.
assert not self._executor_manager_thread, (
'Processes cannot be fork()ed after the thread has started, '
'deadlock in the child processes could result; bpo-46464.')
for _ in range(len(self._processes), self._max_workers):
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._initializer,
self._initargs,
self._max_tasks_per_child))
self._initargs))
p.start()
self._processes[p.pid] = p

Expand All @@ -742,7 +697,6 @@ def submit(self, fn, /, *args, **kwargs):
# Wake up queue management thread
self._executor_manager_thread_wakeup.wakeup()

self._adjust_process_count()
self._start_executor_manager_thread()
return f
submit.__doc__ = _base.Executor.submit.__doc__
Expand Down
75 changes: 5 additions & 70 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def create_future(state=PENDING, exception=None, result=None):

INITIALIZER_STATUS = 'uninitialized'


def mul(x, y):
return x * y

Expand Down Expand Up @@ -503,16 +504,10 @@ def _prime_executor(self):
pass

def test_processes_terminate(self):
def acquire_lock(lock):
lock.acquire()

mp_context = get_context()
sem = mp_context.Semaphore(0)
for _ in range(3):
self.executor.submit(acquire_lock, sem)
self.assertEqual(len(self.executor._processes), 3)
for _ in range(3):
sem.release()
self.executor.submit(mul, 21, 2)
self.executor.submit(mul, 6, 7)
self.executor.submit(mul, 3, 14)
self.assertEqual(len(self.executor._processes), 5)
processes = self.executor._processes
self.executor.shutdown()

Expand Down Expand Up @@ -1014,66 +1009,6 @@ def test_ressources_gced_in_workers(self):
mgr.shutdown()
mgr.join()

def test_saturation(self):
executor = self.executor_type(4)
mp_context = get_context()
sem = mp_context.Semaphore(0)
job_count = 15 * executor._max_workers
try:
for _ in range(job_count):
executor.submit(sem.acquire)
self.assertEqual(len(executor._processes), executor._max_workers)
for _ in range(job_count):
sem.release()
finally:
executor.shutdown()

def test_idle_process_reuse_one(self):
executor = self.executor_type(4)
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
self.assertEqual(len(executor._processes), 1)
executor.shutdown()

def test_idle_process_reuse_multiple(self):
executor = self.executor_type(4)
executor.submit(mul, 12, 7).result()
executor.submit(mul, 33, 25)
executor.submit(mul, 25, 26).result()
executor.submit(mul, 18, 29)
self.assertLessEqual(len(executor._processes), 2)
executor.shutdown()

def test_max_tasks_per_child(self):
executor = self.executor_type(1, max_tasks_per_child=3)
f1 = executor.submit(os.getpid)
original_pid = f1.result()
# The worker pid remains the same as the worker could be reused
f2 = executor.submit(os.getpid)
self.assertEqual(f2.result(), original_pid)
self.assertEqual(len(executor._processes), 1)
f3 = executor.submit(os.getpid)
self.assertEqual(f3.result(), original_pid)

# A new worker is spawned, with a statistically different pid,
# while the previous was reaped.
f4 = executor.submit(os.getpid)
new_pid = f4.result()
self.assertNotEqual(original_pid, new_pid)
self.assertEqual(len(executor._processes), 1)

executor.shutdown()

def test_max_tasks_early_shutdown(self):
executor = self.executor_type(3, max_tasks_per_child=1)
futures = []
for i in range(6):
futures.append(executor.submit(mul, i, i))
executor.shutdown()
for i, future in enumerate(futures):
self.assertEqual(future.result(), mul(i, i))


create_executor_tests(ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Remove the 3.11.0a3 :issue:`44733` added ``max_tasks_per_child`` feature from
:class:`concurrent.futures.ProcessPoolExecutor` as the implementation relied on
mixing threading and fork() which can cause deadlocks in child processes.

Remove the 3.9 :issue:`39207` performance enhancement from that spawned worker
processes for :class:`concurrent.futures.ProcessPoolExecutor` on demand rather
than up front as the implementation could cause deadlocks in the child process.