Skip to content

Commit 405b063

Browse files
elfstromblurb-it[bot]pitroucjw296tomMoral
authored
gh-105829: Fix concurrent.futures.ProcessPoolExecutor deadlock (#108513)
This fixes issue #105829, #105829 Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Antoine Pitrou <[email protected]> Co-authored-by: Chris Withers <[email protected]> Co-authored-by: Thomas Moreau <[email protected]>
1 parent e94a223 commit 405b063

File tree

3 files changed

+87
-4
lines changed

3 files changed

+87
-4
lines changed

Lib/concurrent/futures/process.py

+15-3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ def __init__(self):
7171
self._reader, self._writer = mp.Pipe(duplex=False)
7272

7373
def close(self):
74+
# Please note that we do not take the shutdown lock when
75+
# calling clear() (to avoid deadlocking) so this method can
76+
# only be called safely from the same thread as all calls to
77+
# clear() even if you hold the shutdown lock. Otherwise we
78+
# might try to read from the closed pipe.
7479
if not self._closed:
7580
self._closed = True
7681
self._writer.close()
@@ -426,8 +431,12 @@ def wait_result_broken_or_wakeup(self):
426431
elif wakeup_reader in ready:
427432
is_broken = False
428433

429-
with self.shutdown_lock:
430-
self.thread_wakeup.clear()
434+
# No need to hold the _shutdown_lock here because:
435+
# 1. we're the only thread to use the wakeup reader
436+
# 2. we're also the only thread to call thread_wakeup.close()
437+
# 3. we want to avoid a possible deadlock when both reader and writer
438+
# would block (gh-105829)
439+
self.thread_wakeup.clear()
431440

432441
return result_item, is_broken, cause
433442

@@ -717,7 +726,10 @@ def __init__(self, max_workers=None, mp_context=None,
717726
# as it could result in a deadlock if a worker process dies with the
718727
# _result_queue write lock still acquired.
719728
#
720-
# _shutdown_lock must be locked to access _ThreadWakeup.
729+
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
730+
# .wakeup(). Care must also be taken to not call clear or close from
731+
# more than one thread since _ThreadWakeup.clear() is not protected by
732+
# the _shutdown_lock
721733
self._executor_manager_thread_wakeup = _ThreadWakeup()
722734

723735
# Create communication channels for the executor

Lib/test/test_concurrent_futures/test_deadlock.py

+71-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import contextlib
2+
import queue
3+
import signal
24
import sys
35
import time
46
import unittest
7+
import unittest.mock
58
from pickle import PicklingError
69
from concurrent import futures
7-
from concurrent.futures.process import BrokenProcessPool
10+
from concurrent.futures.process import BrokenProcessPool, _ThreadWakeup
811

912
from test import support
1013

@@ -241,6 +244,73 @@ def test_crash_big_data(self):
241244

242245
executor.shutdown(wait=True)
243246

247+
def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
248+
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
249+
# fill up and block. See: https://github.com/python/cpython/issues/105829
250+
251+
# Lots of cargo culting while writing this test, apologies if
252+
# something is really stupid...
253+
254+
self.executor.shutdown(wait=True)
255+
256+
if not hasattr(signal, 'alarm'):
257+
raise unittest.SkipTest(
258+
"Tested platform does not support the alarm signal")
259+
260+
def timeout(_signum, _frame):
261+
import faulthandler
262+
faulthandler.dump_traceback()
263+
264+
raise RuntimeError("timed out while submitting jobs?")
265+
266+
thread_run = futures.process._ExecutorManagerThread.run
267+
def mock_run(self):
268+
# Delay thread startup so the wakeup pipe can fill up and block
269+
time.sleep(3)
270+
thread_run(self)
271+
272+
class MockWakeup(_ThreadWakeup):
273+
"""Mock wakeup object to force the wakeup to block"""
274+
def __init__(self):
275+
super().__init__()
276+
self._dummy_queue = queue.Queue(maxsize=1)
277+
278+
def wakeup(self):
279+
self._dummy_queue.put(None, block=True)
280+
super().wakeup()
281+
282+
def clear(self):
283+
try:
284+
while True:
285+
self._dummy_queue.get_nowait()
286+
except queue.Empty:
287+
super().clear()
288+
289+
with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
290+
'run', mock_run),
291+
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
292+
MockWakeup)):
293+
with self.executor_type(max_workers=2,
294+
mp_context=self.get_context()) as executor:
295+
self.executor = executor # Allow clean up in fail_on_deadlock
296+
297+
job_num = 100
298+
job_data = range(job_num)
299+
300+
# Need to use sigalarm for timeout detection because
301+
# Executor.submit is not guarded by any timeout (both
302+
# self._work_ids.put(self._queue_count) and
303+
# self._executor_manager_thread_wakeup.wakeup() might
304+
# timeout, maybe more?). In this specific case it was
305+
# the wakeup call that deadlocked on a blocking pipe.
306+
old_handler = signal.signal(signal.SIGALRM, timeout)
307+
try:
308+
signal.alarm(int(self.TIMEOUT))
309+
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
310+
finally:
311+
signal.alarm(0)
312+
signal.signal(signal.SIGALRM, old_handler)
313+
244314

245315
create_executor_tests(globals(), ExecutorDeadlockTest,
246316
executor_mixins=(ProcessPoolForkMixin,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix concurrent.futures.ProcessPoolExecutor deadlock

0 commit comments

Comments
 (0)