Skip to content

Commit a4dfe8e

Browse files
authored
bpo-39995: Fix concurrent.futures _ThreadWakeup (GH-19760)
Fix a race condition in concurrent.futures._ThreadWakeup: access to _ThreadWakeup is now protected with the shutdown lock.
1 parent 7036477 commit a4dfe8e

File tree

2 files changed

+29
-14
lines changed

2 files changed

+29
-14
lines changed

Lib/concurrent/futures/process.py

+27-14
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def _python_exit():
9090
_global_shutdown = True
9191
items = list(_threads_wakeups.items())
9292
for _, thread_wakeup in items:
93+
# call not protected by ProcessPoolExecutor._shutdown_lock
9394
thread_wakeup.wakeup()
9495
for t, _ in items:
9596
t.join()
@@ -157,8 +158,10 @@ def __init__(self, work_id, fn, args, kwargs):
157158

158159
class _SafeQueue(Queue):
159160
"""Safe Queue set exception to the future object linked to a job"""
160-
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
161+
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
162+
thread_wakeup):
161163
self.pending_work_items = pending_work_items
164+
self.shutdown_lock = shutdown_lock
162165
self.thread_wakeup = thread_wakeup
163166
super().__init__(max_size, ctx=ctx)
164167

@@ -167,7 +170,8 @@ def _on_queue_feeder_error(self, e, obj):
167170
tb = traceback.format_exception(type(e), e, e.__traceback__)
168171
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
169172
work_item = self.pending_work_items.pop(obj.work_id, None)
170-
self.thread_wakeup.wakeup()
173+
with self.shutdown_lock:
174+
self.thread_wakeup.wakeup()
171175
# work_item can be None if another process terminated. In this
172176
# case, the executor_manager_thread fails all work_items
173177
# with BrokenProcessPool
@@ -268,17 +272,21 @@ def __init__(self, executor):
268272
# A _ThreadWakeup to allow waking up the queue_manager_thread from the
269273
# main Thread and avoid deadlocks caused by permanently locked queues.
270274
self.thread_wakeup = executor._executor_manager_thread_wakeup
275+
self.shutdown_lock = executor._shutdown_lock
271276

272277
# A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
273278
# to determine if the ProcessPoolExecutor has been garbage collected
274279
# and that the manager can exit.
275280
# When the executor gets garbage collected, the weakref callback
276281
# will wake up the queue management thread so that it can terminate
277282
# if there is no pending work item.
278-
def weakref_cb(_, thread_wakeup=self.thread_wakeup):
283+
def weakref_cb(_,
284+
thread_wakeup=self.thread_wakeup,
285+
shutdown_lock=self.shutdown_lock):
279286
mp.util.debug('Executor collected: triggering callback for'
280287
' QueueManager wakeup')
281-
thread_wakeup.wakeup()
288+
with shutdown_lock:
289+
thread_wakeup.wakeup()
282290

283291
self.executor_reference = weakref.ref(executor, weakref_cb)
284292

@@ -363,6 +371,7 @@ def wait_result_broken_or_wakeup(self):
363371
# submitted, from the executor being shutdown/gc-ed, or from the
364372
# shutdown of the python interpreter.
365373
result_reader = self.result_queue._reader
374+
assert not self.thread_wakeup._closed
366375
wakeup_reader = self.thread_wakeup._reader
367376
readers = [result_reader, wakeup_reader]
368377
worker_sentinels = [p.sentinel for p in self.processes.values()]
@@ -380,7 +389,9 @@ def wait_result_broken_or_wakeup(self):
380389

381390
elif wakeup_reader in ready:
382391
is_broken = False
383-
self.thread_wakeup.clear()
392+
393+
with self.shutdown_lock:
394+
self.thread_wakeup.clear()
384395

385396
return result_item, is_broken, cause
386397

@@ -500,7 +511,8 @@ def join_executor_internals(self):
500511
# Release the queue's resources as soon as possible.
501512
self.call_queue.close()
502513
self.call_queue.join_thread()
503-
self.thread_wakeup.close()
514+
with self.shutdown_lock:
515+
self.thread_wakeup.close()
504516
# If .join() is not called on the created processes then
505517
# some ctx.Queue methods may deadlock on Mac OS X.
506518
for p in self.processes.values():
@@ -619,6 +631,8 @@ def __init__(self, max_workers=None, mp_context=None,
619631
# _result_queue to send wakeup signals to the executor_manager_thread
620632
# as it could result in a deadlock if a worker process dies with the
621633
# _result_queue write lock still acquired.
634+
#
635+
# _shutdown_lock must be locked to access _ThreadWakeup.
622636
self._executor_manager_thread_wakeup = _ThreadWakeup()
623637

624638
# Create communication channels for the executor
@@ -629,6 +643,7 @@ def __init__(self, max_workers=None, mp_context=None,
629643
self._call_queue = _SafeQueue(
630644
max_size=queue_size, ctx=self._mp_context,
631645
pending_work_items=self._pending_work_items,
646+
shutdown_lock=self._shutdown_lock,
632647
thread_wakeup=self._executor_manager_thread_wakeup)
633648
# Killed worker processes can produce spurious "broken pipe"
634649
# tracebacks in the queue's own worker thread. But we detect killed
@@ -718,12 +733,12 @@ def shutdown(self, wait=True, *, cancel_futures=False):
718733
with self._shutdown_lock:
719734
self._cancel_pending_futures = cancel_futures
720735
self._shutdown_thread = True
736+
if self._executor_manager_thread_wakeup is not None:
737+
# Wake up queue management thread
738+
self._executor_manager_thread_wakeup.wakeup()
721739

722-
if self._executor_manager_thread:
723-
# Wake up queue management thread
724-
self._executor_manager_thread_wakeup.wakeup()
725-
if wait:
726-
self._executor_manager_thread.join()
740+
if self._executor_manager_thread is not None and wait:
741+
self._executor_manager_thread.join()
727742
# To reduce the risk of opening too many files, remove references to
728743
# objects that use file descriptors.
729744
self._executor_manager_thread = None
@@ -732,8 +747,6 @@ def shutdown(self, wait=True, *, cancel_futures=False):
732747
self._result_queue.close()
733748
self._result_queue = None
734749
self._processes = None
735-
736-
if self._executor_manager_thread_wakeup:
737-
self._executor_manager_thread_wakeup = None
750+
self._executor_manager_thread_wakeup = None
738751

739752
shutdown.__doc__ = _base.Executor.shutdown.__doc__
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix a race condition in concurrent.futures._ThreadWakeup: access to
2+
_ThreadWakeup is now protected with the shutdown lock.

0 commit comments

Comments
 (0)