-
-
Notifications
You must be signed in to change notification settings - Fork 31.9k
bpo-39995: Fix concurrent.futures _ThreadWakeup #19760
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,6 +90,7 @@ def _python_exit(): | |
_global_shutdown = True | ||
items = list(_threads_wakeups.items()) | ||
for _, thread_wakeup in items: | ||
# call not protected by ProcessPoolExecutor._shutdown_lock | ||
thread_wakeup.wakeup() | ||
for t, _ in items: | ||
t.join() | ||
|
@@ -157,8 +158,10 @@ def __init__(self, work_id, fn, args, kwargs): | |
|
||
class _SafeQueue(Queue): | ||
"""Safe Queue set exception to the future object linked to a job""" | ||
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): | ||
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, | ||
thread_wakeup): | ||
self.pending_work_items = pending_work_items | ||
self.shutdown_lock = shutdown_lock | ||
self.thread_wakeup = thread_wakeup | ||
super().__init__(max_size, ctx=ctx) | ||
|
||
|
@@ -167,7 +170,8 @@ def _on_queue_feeder_error(self, e, obj): | |
tb = traceback.format_exception(type(e), e, e.__traceback__) | ||
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) | ||
work_item = self.pending_work_items.pop(obj.work_id, None) | ||
self.thread_wakeup.wakeup() | ||
with self.shutdown_lock: | ||
self.thread_wakeup.wakeup() | ||
# work_item can be None if another process terminated. In this | ||
# case, the executor_manager_thread fails all work_items | ||
# with BrokenProcessPool | ||
|
@@ -268,17 +272,21 @@ def __init__(self, executor): | |
# A _ThreadWakeup to allow waking up the queue_manager_thread from the | ||
# main Thread and avoid deadlocks caused by permanently locked queues. | ||
self.thread_wakeup = executor._executor_manager_thread_wakeup | ||
self.shutdown_lock = executor._shutdown_lock | ||
|
||
# A weakref.ref to the ProcessPoolExecutor that owns this thread. Used | ||
# to determine if the ProcessPoolExecutor has been garbage collected | ||
# and that the manager can exit. | ||
# When the executor gets garbage collected, the weakref callback | ||
# will wake up the queue management thread so that it can terminate | ||
# if there is no pending work item. | ||
def weakref_cb(_, thread_wakeup=self.thread_wakeup): | ||
def weakref_cb(_, | ||
thread_wakeup=self.thread_wakeup, | ||
shutdown_lock=self.shutdown_lock): | ||
mp.util.debug('Executor collected: triggering callback for' | ||
' QueueManager wakeup') | ||
thread_wakeup.wakeup() | ||
with shutdown_lock: | ||
thread_wakeup.wakeup() | ||
|
||
self.executor_reference = weakref.ref(executor, weakref_cb) | ||
|
||
|
@@ -363,6 +371,7 @@ def wait_result_broken_or_wakeup(self): | |
# submitted, from the executor being shutdown/gc-ed, or from the | ||
# shutdown of the python interpreter. | ||
result_reader = self.result_queue._reader | ||
assert not self.thread_wakeup._closed | ||
wakeup_reader = self.thread_wakeup._reader | ||
readers = [result_reader, wakeup_reader] | ||
worker_sentinels = [p.sentinel for p in self.processes.values()] | ||
|
@@ -380,7 +389,9 @@ def wait_result_broken_or_wakeup(self): | |
|
||
elif wakeup_reader in ready: | ||
is_broken = False | ||
self.thread_wakeup.clear() | ||
|
||
with self.shutdown_lock: | ||
self.thread_wakeup.clear() | ||
|
||
return result_item, is_broken, cause | ||
|
||
|
@@ -500,7 +511,8 @@ def join_executor_internals(self): | |
# Release the queue's resources as soon as possible. | ||
self.call_queue.close() | ||
self.call_queue.join_thread() | ||
self.thread_wakeup.close() | ||
with self.shutdown_lock: | ||
self.thread_wakeup.close() | ||
# If .join() is not called on the created processes then | ||
# some ctx.Queue methods may deadlock on Mac OS X. | ||
for p in self.processes.values(): | ||
|
@@ -619,6 +631,8 @@ def __init__(self, max_workers=None, mp_context=None, | |
# _result_queue to send wakeup signals to the executor_manager_thread | ||
# as it could result in a deadlock if a worker process dies with the | ||
# _result_queue write lock still acquired. | ||
# | ||
# _shutdown_lock must be locked to access _ThreadWakeup. | ||
self._executor_manager_thread_wakeup = _ThreadWakeup() | ||
|
||
# Create communication channels for the executor | ||
|
@@ -629,6 +643,7 @@ def __init__(self, max_workers=None, mp_context=None, | |
self._call_queue = _SafeQueue( | ||
max_size=queue_size, ctx=self._mp_context, | ||
pending_work_items=self._pending_work_items, | ||
shutdown_lock=self._shutdown_lock, | ||
thread_wakeup=self._executor_manager_thread_wakeup) | ||
# Killed worker processes can produce spurious "broken pipe" | ||
# tracebacks in the queue's own worker thread. But we detect killed | ||
|
@@ -718,12 +733,12 @@ def shutdown(self, wait=True, *, cancel_futures=False): | |
with self._shutdown_lock: | ||
self._cancel_pending_futures = cancel_futures | ||
self._shutdown_thread = True | ||
if self._executor_manager_thread_wakeup is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is the same as both are set to |
||
# Wake up queue management thread | ||
self._executor_manager_thread_wakeup.wakeup() | ||
|
||
if self._executor_manager_thread: | ||
# Wake up queue management thread | ||
self._executor_manager_thread_wakeup.wakeup() | ||
if wait: | ||
self._executor_manager_thread.join() | ||
if self._executor_manager_thread is not None and wait: | ||
self._executor_manager_thread.join() | ||
# To reduce the risk of opening too many files, remove references to | ||
# objects that use file descriptors. | ||
self._executor_manager_thread = None | ||
|
@@ -732,8 +747,6 @@ def shutdown(self, wait=True, *, cancel_futures=False): | |
self._result_queue.close() | ||
self._result_queue = None | ||
self._processes = None | ||
|
||
if self._executor_manager_thread_wakeup: | ||
self._executor_manager_thread_wakeup = None | ||
self._executor_manager_thread_wakeup = None | ||
|
||
shutdown.__doc__ = _base.Executor.shutdown.__doc__ |
2 changes: 2 additions & 0 deletions
2
Misc/NEWS.d/next/Library/2020-04-28-18-25-27.bpo-39995.WmA3Gk.rst
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
Fix a race condition in concurrent.futures._ThreadWakeup: access to | ||
_ThreadWakeup is now protected with the shutdown lock. |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this lock is necessary as the goal is to protect against
_ThreadWakeup.close
which should only be called in the same thread.