Skip to content

bpo-39812: Remove daemon threads in concurrent.futures #19149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Doc/whatsnew/3.9.rst
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ which have not started running, instead of waiting for them to complete before
shutting down the executor.
(Contributed by Kyle Stanley in :issue:`39349`.)

Removed daemon threads from :class:`~concurrent.futures.ThreadPoolExecutor`
and :class:`~concurrent.futures.ProcessPoolExecutor`. This improves
Comment on lines +198 to +199
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a tilde ~ shortens the Sphinx markup link to appear as just the last part, I.E. ThreadPoolExecutor and ProcessPoolExecutor. I typically use this when the classes are easily distinguishable based on their names alone, and the section already makes it clear what module it's in. This helps to make it a bit more succinct.

compatibility with subinterpreters and predictability in their shutdown
processes. (Contributed by Kyle Stanley in :issue:`39812`.)

curses
------

Expand Down
23 changes: 6 additions & 17 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,6 @@
import sys
import traceback

# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
# allowing workers to die with the interpreter has two undesirable properties:
# - The workers would still be running during interpreter shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
Comment on lines -62 to -74
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this section is no longer relevant now that the workers are no longer daemon threads. Correct me if I'm mistaken.


_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False
Expand Down Expand Up @@ -107,6 +94,12 @@ def _python_exit():
for t, _ in items:
t.join()

# Register for `_python_exit()` to be called just before joining all
# non-daemon threads. This is used instead of `atexit.register()` for
# compatibility with subinterpreters, which no longer support daemon threads.
# See bpo-39812 for context.
threading._register_atexit(_python_exit)

# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
Expand Down Expand Up @@ -306,9 +299,7 @@ def weakref_cb(_, thread_wakeup=self.thread_wakeup):
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items

# Set this thread to be daemonized
super().__init__()
self.daemon = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually depending on this behavior and my code breaks in Python 3.9. Is there a way to set the threads to be daemon? The error I got:

Traceback (most recent call last):
  File "/Users/laike9m/.pyenv/versions/3.9.0/lib/python3.9/threading.py", line 950, in _bootstrap_inner
    self.run()
  File "/Users/laike9m/.pyenv/versions/3.9.0/lib/python3.9/threading.py", line 888, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/laike9m/.pyenv/versions/[email protected]/lib/python3.9/site-packages/grpc/_server.py", line 875, in _serve
    if not _process_event_and_continue(state, event):
  File "/Users/laike9m/.pyenv/versions/[email protected]/lib/python3.9/site-packages/grpc/_server.py", line 839, in _process_event_and_continue
    rpc_state, rpc_future = _handle_call(event, state.generic_handlers,
  File "/Users/laike9m/.pyenv/versions/[email protected]/lib/python3.9/site-packages/grpc/_server.py", line 749, in _handle_call
    return _handle_with_method_handler(rpc_event, method_handler,
  File "/Users/laike9m/.pyenv/versions/[email protected]/lib/python3.9/site-packages/grpc/_server.py", line 722, in _handle_with_method_handler
    return state, _handle_unary_stream(rpc_event, state,
  File "/Users/laike9m/.pyenv/versions/[email protected]/lib/python3.9/site-packages/grpc/_server.py", line 640, in _handle_unary_stream
    return thread_pool.submit(_stream_response_in_pool, rpc_event, state,
  File "/Users/laike9m/.pyenv/versions/3.9.0/lib/python3.9/concurrent/futures/thread.py", line 163, in submit
    raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown

which I believe is relevant. @aeros

Copy link
Contributor Author

@aeros aeros Oct 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@laike9m Hmm, does your code specifically rely on submitting new jobs after executor.shutdown() gets called? If so, would it be possible to move the executor.shutdown() to a later point (optimally, you should be done with using the executor before calling shutdown())?

That being said, there may be some reason to consider allowing users to opt-in to using daemon threads; particularly since the decision to prevent subinterpreters from using daemon threads was reverted (which was originally the main motivation for this PR, with the benefit of more predictable shutdown behavior).

Do you have any thoughts @pitrou?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@laike9m What do you mean with "break"? Does the process not exit cleanly?

Copy link
Contributor

@laike9m laike9m Oct 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me describe my use case.

I'm using gRPC which starts a server using ThreadPoolExecutor:
https://github.com/laike9m/Cyberbrain/blob/master/cyberbrain/rpc_server.py#L235

This server runs in another thread besides the main thread, and would wait using server.wait_for_termination() until users explicitly ctrl-c. The main thread would run and finish the work, clients may or may not connect to the server before the main thread end, and the requirement is for the server to respond even after the main thread has finished. In Python <=3.8 this works, but in 3.9 I got the above error.

I'm not 100% that the root cause is this change, but I scanned through all changes in concurrent.futures in 3.9, and this seems to be the most likely one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then I think you need to define a signal handler that will be called when Ctrl-C is pressed, and that will ask the server to stop gracefully.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm, it's actually not about what happens when users press Ctrl+C, but the server is not able to serve requests before that...

server starts listening --> main thread finishes ---> requests come ------> Ctrl + C
                                                    (problem is here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to come up with a minimal example that can reproduce this error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, then it's simpler: you should ask the server to finish and wait for it to finish before you let the main thread exit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thanks.


def run(self):
# Main loop for the executor manager thread.
Expand Down Expand Up @@ -732,5 +723,3 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self._executor_manager_thread_wakeup = None

shutdown.__doc__ = _base.Executor.shutdown.__doc__

atexit.register(_python_exit)
Comment on lines -735 to -736
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is just to make the atexit registration be in the same location for both ProcessPoolExecutor and ThreadPoolExecutor (right under the definition for _python_exit()). It can be removed if needed, but I think it will make them easier to locate for those reading over the implementation details.

20 changes: 5 additions & 15 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,6 @@
import weakref
import os

# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpreter shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.

_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
Expand All @@ -43,7 +30,11 @@ def _python_exit():
for t, q in items:
t.join()

atexit.register(_python_exit)
# Register for `_python_exit()` to be called just before joining all
# non-daemon threads. This is used instead of `atexit.register()` for
# compatibility with subinterpreters, which no longer support daemon threads.
# See bpo-39812 for context.
threading._register_atexit(_python_exit)


class _WorkItem(object):
Expand Down Expand Up @@ -197,7 +188,6 @@ def weakref_cb(_, q=self._work_queue):
self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
Expand Down
50 changes: 50 additions & 0 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -1397,5 +1397,55 @@ def test_interrupt_main_noerror(self):
signal.signal(signal.SIGINT, handler)


class AtexitTests(unittest.TestCase):

def test_atexit_output(self):
rc, out, err = assert_python_ok("-c", """if True:
import threading

def run_last():
print('parrot')

threading._register_atexit(run_last)
""")

self.assertFalse(err)
self.assertEqual(out.strip(), b'parrot')

def test_atexit_called_once(self):
rc, out, err = assert_python_ok("-c", """if True:
import threading
from unittest.mock import Mock

mock = Mock()
threading._register_atexit(mock)
mock.assert_not_called()
# force early shutdown to ensure it was called once
threading._shutdown()
mock.assert_called_once()
""")

self.assertFalse(err)

def test_atexit_after_shutdown(self):
# The only way to do this is by registering an atexit within
# an atexit, which is intended to raise an exception.
rc, out, err = assert_python_ok("-c", """if True:
import threading

def func():
pass

def run_last():
threading._register_atexit(func)

threading._register_atexit(run_last)
""")

self.assertTrue(err)
self.assertIn("RuntimeError: can't register atexit after shutdown",
err.decode())


if __name__ == "__main__":
unittest.main()
29 changes: 29 additions & 0 deletions Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os as _os
import sys as _sys
import _thread
import functools

from time import monotonic as _time
from _weakrefset import WeakSet
Expand Down Expand Up @@ -1346,6 +1347,27 @@ def enumerate():
with _active_limbo_lock:
return list(_active.values()) + list(_limbo.values())


_threading_atexits = []
_SHUTTING_DOWN = False

def _register_atexit(func, *arg, **kwargs):
"""CPython internal: register *func* to be called before joining threads.
Copy link
Contributor Author

@aeros aeros Mar 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I specified "CPython internal" instead of just "internal" to make it more clear that it's intended to be used across stdlib modules (as opposed to an internal helper function for threading).

At some point, I think this could graduate to the public API, but at the moment it's a bit too niche since subinterpreters have not yet officially made it into the stdlib. If such a utility is requested by users, I think it could be moved without much hassle though. That's partly why I included a detailed docstring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds good to me, thanks for the explanation.


The registered *func* is called with its arguments just before all
non-daemon threads are joined in `_shutdown()`. It provides a similar
purpose to `atexit.register()`, but its functions are called prior to
threading shutdown instead of interpreter shutdown.

For similarity to atexit, the registered functions are called in reverse.
"""
if _SHUTTING_DOWN:
raise RuntimeError("can't register atexit after shutdown")

call = functools.partial(func, *arg, **kwargs)
_threading_atexits.append(call)


from _thread import stack_size

# Create the main thread object,
Expand All @@ -1367,6 +1389,8 @@ def _shutdown():
# _shutdown() was already called
return

global _SHUTTING_DOWN
_SHUTTING_DOWN = True
# Main thread
tlock = _main_thread._tstate_lock
# The main thread isn't finished yet, so its thread state lock can't have
Expand All @@ -1376,6 +1400,11 @@ def _shutdown():
tlock.release()
_main_thread._stop()

# Call registered threading atexit functions before threads are joined.
# Order is reversed, similar to atexit.
for atexit_call in reversed(_threading_atexits):
atexit_call()

# Join all non-deamon threads
while True:
with _shutdown_locks_lock:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Removed daemon threads from :mod:`concurrent.futures` by adding
an internal `threading._register_atexit()`, which calls registered functions
prior to joining all non-daemon threads. This allows for compatibility
with subinterpreters, which don't support daemon threads.