diff --git a/Doc/library/asyncio-eventloop.rst b/Doc/library/asyncio-eventloop.rst index 72f484fd1cbe77..7a9c4f59fe306a 100644 --- a/Doc/library/asyncio-eventloop.rst +++ b/Doc/library/asyncio-eventloop.rst @@ -1051,6 +1051,10 @@ Watching file descriptors See also :ref:`Platform Support ` section for some limitations of these methods. +.. versionchanged:: next + + Added support for these methods to :class:`ProactorEventLoop`. + Working with socket objects directly ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/Doc/library/asyncio-platforms.rst b/Doc/library/asyncio-platforms.rst index a2a3114ad6e4c5..497bd96cfdd21c 100644 --- a/Doc/library/asyncio-platforms.rst +++ b/Doc/library/asyncio-platforms.rst @@ -16,7 +16,7 @@ due to the platforms' underlying architecture and capabilities. All Platforms ============= -* :meth:`loop.add_reader` and :meth:`loop.add_writer` +* :meth:`~asyncio.loop.add_reader` and :meth:`~asyncio.loop.add_writer` cannot be used to monitor file I/O. @@ -59,8 +59,9 @@ All event loops on Windows do not support the following methods: :class:`ProactorEventLoop` has the following limitations: -* The :meth:`loop.add_reader` and :meth:`loop.add_writer` - methods are not supported. +* :meth:`loop.add_reader` and :meth:`loop.add_writer` only accept + socket handles (for example, pipe file descriptors are not supported). + When called, :func:`select.select` is run in an additional thread. The resolution of the monotonic clock on Windows is usually around 15.6 milliseconds. The best resolution is 0.5 milliseconds. The resolution depends on the @@ -68,6 +69,10 @@ hardware (availability of `HPET `_) and on the Windows configuration. +.. versionadded:: next + + Support for :meth:`loop.add_reader`, :meth:`loop.add_writer` added to :class:`ProactorEventLoop`. + .. _asyncio-windows-subprocess: diff --git a/Lib/asyncio/_selector_thread.py b/Lib/asyncio/_selector_thread.py new file mode 100644 index 00000000000000..edade54898cf89 --- /dev/null +++ b/Lib/asyncio/_selector_thread.py @@ -0,0 +1,323 @@ +# Contains code from https://github.com/tornadoweb/tornado/tree/v6.5.2 +# SPDX-License-Identifier: PSF-2.0 AND Apache-2.0 +# SPDX-FileCopyrightText: Copyright (c) 2025 The Tornado Authors + +""" +Compatibility for [add|remove]_[reader|writer] where unavailable (Proactor). + +Runs select in a background thread. +_Only_ `select.select` is called in the background thread. + +Callbacks are all handled back in the event loop's thread, +as scheduled by `loop.call_soon_threadsafe`. + +Adapted from Tornado 6.5.2 +""" + +import asyncio +import atexit +import contextvars +import errno +import functools +import select +import socket +import threading +import typing + +from typing import ( + Any, + Callable, + Protocol, +) + +from . import events + + +class _HasFileno(Protocol): + def fileno(self) -> int: + pass + + +_FileDescriptorLike = int | _HasFileno + +# Collection of selector thread event loops to shut down on exit. +_selector_loops: set["SelectorThread"] = set() + + +def _atexit_callback() -> None: + for loop in _selector_loops: + with loop._select_cond: + loop._closing_selector = True + loop._select_cond.notify() + try: + loop._waker_w.send(b"a") + except BlockingIOError: + pass + _selector_loops.clear() + + +# use internal _register_atexit to avoid need for daemon threads +# I can't find a public API for equivalent functionality +# to run something prior to thread join during process teardown +threading._register_atexit(_atexit_callback) + + +class SelectorThread: + """Define ``add_reader`` methods to be called in a background select thread. + + Instances of this class start a second thread to run a selector. + This thread is completely hidden from the user; + all callbacks are run on the wrapped event loop's thread + via :meth:`loop.call_soon_threadsafe`. + """ + + _closed = False + + def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: + self._main_thread_ctx = contextvars.copy_context() + + self._real_loop = real_loop + + self._select_cond = threading.Condition() + self._select_args: tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None = None + self._closing_selector = False + self._thread: threading.Thread | None = None + self._thread_manager_handle = self._thread_manager() + + # When the loop starts, start the thread. Not too soon because we can't + # clean up if we get to this point but the event loop is closed without + # starting. + self._real_loop.call_soon( + lambda: self._real_loop.create_task(self._thread_manager_handle.__anext__()), + context=self._main_thread_ctx, + ) + + self._readers: dict[int, tuple[_FileDescriptorLike, Callable]] = {} + self._writers: dict[int, tuple[_FileDescriptorLike, Callable]] = {} + + # Writing to _waker_w will wake up the selector thread, which + # watches for _waker_r to be readable. + self._waker_r, self._waker_w = socket.socketpair() + self._waker_r.setblocking(False) + self._waker_w.setblocking(False) + _selector_loops.add(self) + self.add_reader(self._waker_r, self._consume_waker) + + def close(self) -> None: + if self._closed: + return + with self._select_cond: + self._closing_selector = True + self._select_cond.notify() + self._wake_selector() + if self._thread is not None: + self._thread.join() + _selector_loops.discard(self) + self.remove_reader(self._waker_r) + self._waker_r.close() + self._waker_w.close() + self._closed = True + + async def _thread_manager(self) -> typing.AsyncGenerator[None, None]: + # Create a thread to run the select system call. We manage this thread + # manually so we can trigger a clean shutdown from an atexit hook. Note + # that due to the order of operations at shutdown, + # we rely on private `threading._register_atexit` + # to wake the thread before joining to avoid hangs. + # See https://github.com/python/cpython/issues/86128 for more info + self._thread = threading.Thread( + name="asyncio selector", + target=self._run_select, + ) + self._thread.start() + self._start_select() + try: + # The presence of this yield statement means that this coroutine + # is actually an asynchronous generator, which has a special + # shutdown protocol. We wait at this yield point until the + # event loop's shutdown_asyncgens method is called, at which point + # we will get a GeneratorExit exception and can shut down the + # selector thread. + yield + except GeneratorExit: + self.close() + raise + + def _wake_selector(self) -> None: + """Wake the selector thread from another thread.""" + if self._closed: + return + try: + self._waker_w.send(b"a") + except BlockingIOError: + pass + + def _consume_waker(self) -> None: + """Consume messages sent via _wake_selector.""" + try: + self._waker_r.recv(1024) + except BlockingIOError: + pass + + def _start_select(self) -> None: + """Start select waiting for events. + + Called from the event loop thread, + schedules select to be called in the background thread. + """ + # Capture reader and writer sets here in the event loop + # thread to avoid any problems with concurrent + # modification while the select loop uses them. + with self._select_cond: + assert self._select_args is None + self._select_args = (list(self._readers.keys()), list(self._writers.keys())) + self._select_cond.notify() + + def _run_select(self) -> None: + """The main function of the select thread. + + Runs `select.select()` until `_closing_selector` attribute is set (typically by `close()`). + Schedules handling of `select.select` output on the main thread + via `loop.call_soon_threadsafe()`. + """ + while not self._closing_selector: + with self._select_cond: + while self._select_args is None and not self._closing_selector: + self._select_cond.wait() + if self._closing_selector: + return + assert self._select_args is not None + to_read, to_write = self._select_args + self._select_args = None + + # We use the simpler interface of the select module instead of + # the more stateful interface in the selectors module because + # this class is only intended for use on windows, where + # select.select is the only option. The selector interface + # does not have well-documented thread-safety semantics that + # we can rely on so ensuring proper synchronization would be + # tricky. + try: + # On windows, selecting on a socket for write will not + # return the socket when there is an error (but selecting + # for reads works). Also select for errors when selecting + # for writes, and merge the results. + # + # This pattern is also used in + # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317 + rs, ws, xs = select.select(to_read, to_write, to_write) + ws = ws + xs + except OSError as e: + # After remove_reader or remove_writer is called, the file + # descriptor may subsequently be closed on the event loop + # thread. It's possible that this select thread hasn't + # gotten into the select system call by the time that + # happens in which case (at least on macOS), select may + # raise a "bad file descriptor" error. If we get that + # error, check and see if we're also being woken up by + # polling the waker alone. If we are, just return to the + # event loop and we'll get the updated set of file + # descriptors on the next iteration. Otherwise, raise the + # original error. + if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF): + rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0) + if rs: + ws = [] + else: + raise + else: + raise + + # if close has already started, don't schedule callbacks, + # which could cause a race + with self._select_cond: + if self._closing_selector: + return + self._real_loop.call_soon_threadsafe( + self._handle_select, rs, ws, context=self._main_thread_ctx + ) + + def _handle_select( + self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike] + ) -> None: + """Handle the result of select.select. + + This method is called on the event loop thread via `call_soon_threadsafe`. + """ + for r in rs: + self._handle_event(r, self._readers) + for w in ws: + self._handle_event(w, self._writers) + self._start_select() + + def _handle_event( + self, + fd: _FileDescriptorLike, + cb_map: dict[int, tuple[_FileDescriptorLike, Callable]], + ) -> None: + """Handle one callback event. + + This method is called on the event loop thread via `call_soon_threadsafe` (from `_handle_select`), + so exception handler wrappers, etc. are applied. + """ + try: + fileobj, handle = cb_map[fd] + except KeyError: + return + if not handle.cancelled(): + handle._run() + + def _split_fd(self, fd: _FileDescriptorLike) -> tuple[int, _FileDescriptorLike]: + """Return fd, file object + + Keeps a handle on the fileobject given, + but always registers integer FD. + """ + fileno = fd + if not isinstance(fileno, int): + try: + fileno = int(fileno.fileno()) + except (AttributeError, TypeError, ValueError): + # This code matches selectors._fileobj_to_fd function. + raise ValueError(f"Invalid file object: {fd!r}") from None + return fileno, fd + + def add_reader( + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + fd, fileobj = self._split_fd(fd) + if fd in self._readers: + _, handle = self._readers[fd] + handle.cancel() + self._readers[fd] = (fileobj, events.Handle(callback, args, self._real_loop)) + self._wake_selector() + + def add_writer( + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + fd, fileobj = self._split_fd(fd) + if fd in self._writers: + _, handle = self._writers[fd] + handle.cancel() + self._writers[fd] = (fileobj, events.Handle(callback, args, self._real_loop)) + self._wake_selector() + + def remove_reader(self, fd: _FileDescriptorLike) -> bool: + fd, _ = self._split_fd(fd) + try: + _, handle = self._readers.pop(fd) + except KeyError: + return False + handle.cancel() + self._wake_selector() + return True + + def remove_writer(self, fd: _FileDescriptorLike) -> bool: + fd, _ = self._split_fd(fd) + try: + _, handle = self._writers.pop(fd) + except KeyError: + return False + handle.cancel() + self._wake_selector() + return True diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index f404273c3ae5c1..be786cd6a259d3 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -23,6 +23,7 @@ from . import transports from . import trsock from .log import logger +from ._selector_thread import SelectorThread as _SelectorThread def _set_socket_extra(transport, sock): @@ -633,6 +634,7 @@ def __init__(self, proactor): logger.debug('Using proactor: %s', proactor.__class__.__name__) self._proactor = proactor self._selector = proactor # convenient alias + self._selector_thread = None self._self_reading_future = None self._accept_futures = {} # socket file descriptor => Future proactor.set_loop(self) @@ -641,6 +643,17 @@ def __init__(self, proactor): # wakeup fd can only be installed to a file descriptor from the main thread signal.set_wakeup_fd(self._csock.fileno()) + def _get_selector_thread(self): + """Return the SelectorThread. + + Creates the thread it on first request, + so no thread is created until/unless + the first call to `add_reader` and friends. + """ + if self._selector_thread is None: + self._selector_thread = _SelectorThread(self) + return self._selector_thread + def _make_socket_transport(self, sock, protocol, waiter=None, extra=None, server=None): return _ProactorSocketTransport(self, sock, protocol, waiter, @@ -692,6 +705,9 @@ def close(self): # Call these methods before closing the event loop (before calling # BaseEventLoop.close), because they can schedule callbacks with # call_soon(), which is forbidden when the event loop is closed. + if self._selector_thread is not None: + self._selector_thread.close() + self._selector_thread = None self._stop_accept_futures() self._close_self_pipe() self._proactor.close() @@ -701,6 +717,18 @@ def close(self): # Close the event loop super().close() + def add_reader(self, fd, callback, *args): + return self._get_selector_thread().add_reader(fd, callback, *args) + + def remove_reader(self, fd): + return self._get_selector_thread().remove_reader(fd) + + def add_writer(self, fd, callback, *args): + return self._get_selector_thread().add_writer(fd, callback, *args) + + def remove_writer(self, fd): + return self._get_selector_thread().remove_writer(fd) + async def sock_recv(self, sock, n): return await self._proactor.recv(sock, n) diff --git a/Lib/test/test_asyncio/test_selector_thread.py b/Lib/test/test_asyncio/test_selector_thread.py new file mode 100644 index 00000000000000..7721e216436e0d --- /dev/null +++ b/Lib/test/test_asyncio/test_selector_thread.py @@ -0,0 +1,128 @@ +import asyncio +import select +import socket +import time +import unittest +from asyncio._selector_thread import SelectorThread +from test import support +from unittest import mock + + +def tearDownModule(): + asyncio.events._set_event_loop_policy(None) + + +class SelectorThreadTest(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self._sockets = [] + self.selector_thread = SelectorThread(asyncio.get_running_loop()) + + def socketpair(self): + pair = socket.socketpair() + self._sockets.extend(pair) + return pair + + async def asyncTearDown(self): + self.selector_thread.close() + for s in self._sockets: + s.close() + + async def test_slow_reader(self): + a, b = self.socketpair() + first_recv = asyncio.Future() + + def recv(): + msg = b.recv(100) + if not first_recv.done(): + first_recv.set_result(msg) + + mock_recv = mock.MagicMock(wraps=recv) + # make sure select is only called once when + # event loop thread is slow to consume events + a.sendall(b"msg") + with mock.patch("select.select", wraps=select.select) as mock_select: + self.selector_thread.add_reader(b, mock_recv) + # ready event, but main event loop is blocked for some time + time.sleep(0.1) + recvd = await asyncio.wait_for(first_recv, timeout=support.SHORT_TIMEOUT) + self.assertEqual(recvd, b"msg") + # make sure recv wasn't scheduled more than once + self.assertEqual(mock_recv.call_count, 1) + # 1 for add_reader + # 1 for finishing reader callback + # up to 2 more for wake FD calls if CI is slow + # this would be thousands if select is busy-looping while the main thread blocks + self.assertLessEqual(mock_select.call_count, 5) + + async def test_reader_error(self): + # test error handling in callbacks doesn't break handling + a, b = self.socketpair() + a.sendall(b"to_b") + + selector_thread = self.selector_thread + + # make sure it's called a few times, + # and errors don't prevent rescheduling + n_failures = 5 + counter = 0 + bad_recv_done = asyncio.Future() + + def bad_recv(sock): + # fail the first n_failures calls, then succeed + nonlocal counter + counter += 1 + if counter > n_failures: + bad_recv_done.set_result(None) + sock.recv(10) + return + raise Exception("Testing reader error") + + recv_callback = mock.MagicMock(wraps=bad_recv) + + exception_handler = mock.MagicMock() + asyncio.get_running_loop().set_exception_handler(exception_handler) + + selector_thread.add_reader(b, recv_callback, b) + + # make sure start_select is called + # even when recv callback errors, + with mock.patch.object( + selector_thread, "_start_select", wraps=selector_thread._start_select + ) as start_select: + await asyncio.wait_for(bad_recv_done, timeout=support.SHORT_TIMEOUT) + + # make sure recv is called N + 1 times, + # exception N times, + # start_select at least that many + self.assertEqual(recv_callback.call_count, n_failures + 1) + self.assertEqual(exception_handler.call_count, n_failures) + self.assertGreaterEqual(start_select.call_count, n_failures) + + async def test_read_write(self): + a, b = self.socketpair() + read_future = asyncio.Future() + sent = b"asdf" + loop = asyncio.get_running_loop() + selector_thread = self.selector_thread + + def write(): + a.sendall(sent) + loop.remove_writer(a) + + def read(): + msg = b.recv(100) + read_future.set_result(msg) + + selector_thread.add_reader(b, read) + self.assertIn(b.fileno(), selector_thread._readers) + selector_thread.add_writer(a, write) + self.assertIn(a.fileno(), selector_thread._writers) + msg = await asyncio.wait_for(read_future, timeout=10) + + selector_thread.remove_writer(a) + self.assertNotIn(a.fileno(), selector_thread._writers) + selector_thread.remove_reader(b) + self.assertNotIn(b.fileno(), selector_thread._readers) + a.close() + b.close() + self.assertEqual(msg, sent) diff --git a/Lib/test/test_asyncio/test_windows_events.py b/Lib/test/test_asyncio/test_windows_events.py index 0af3368627afca..fa3a33f4185ee7 100644 --- a/Lib/test/test_asyncio/test_windows_events.py +++ b/Lib/test/test_asyncio/test_windows_events.py @@ -5,6 +5,7 @@ import time import threading import unittest +from test import support from unittest import mock if sys.platform != 'win32': @@ -323,6 +324,59 @@ def threadMain(): stop.set() thr.join() + def test_add_reader_invalid_argument(self): + def assert_raises(): + return self.assertRaisesRegex(ValueError, r"Invalid file object") + + def cb(sock): + return None + + with assert_raises(): + self.loop.add_reader(object(), cb) + with assert_raises(): + self.loop.add_writer(object(), cb) + + with assert_raises(): + self.loop.remove_reader(object()) + with assert_raises(): + self.loop.remove_writer(object()) + + def test_selector_thread(self): + self.assertIsNone(self.loop._selector_thread) + a, b = socket.socketpair() + + async def _test(): + read_future = asyncio.Future() + sent = b"asdf" + + def write(): + a.sendall(sent) + self.loop.remove_writer(a) + + def read(): + msg = b.recv(100) + read_future.set_result(msg) + + self.loop.add_reader(b, read) + _selector_thread = self.loop._selector_thread + self.assertIn(b.fileno(), _selector_thread._readers) + self.assertIsNotNone(_selector_thread) + self.loop.add_writer(a, write) + self.assertIs(self.loop._selector_thread, _selector_thread) + self.assertIn(a.fileno(), _selector_thread._writers) + msg = await asyncio.wait_for(read_future, timeout=support.SHORT_TIMEOUT) + + self.loop.remove_writer(a) + self.assertNotIn(a.fileno(), _selector_thread._writers) + self.loop.remove_reader(b) + self.assertNotIn(b.fileno(), _selector_thread._readers) + a.close() + b.close() + self.assertIs(self.loop._selector_thread, _selector_thread) + self.assertEqual(msg, sent) + + self.loop.run_until_complete(_test()) + class WinPolicyTests(WindowsEventsTestCase): diff --git a/Misc/NEWS.d/next/Windows/2025-11-21-14-44-48.gh-issue-81554.hNFGMW.rst b/Misc/NEWS.d/next/Windows/2025-11-21-14-44-48.gh-issue-81554.hNFGMW.rst new file mode 100644 index 00000000000000..efbd11b6357e53 --- /dev/null +++ b/Misc/NEWS.d/next/Windows/2025-11-21-14-44-48.gh-issue-81554.hNFGMW.rst @@ -0,0 +1,2 @@ +Added support for :meth:`~asyncio.loop.add_reader` to :class:`~asyncio.ProactorEventLoop` on Windows by +running :func:`select.select` in a background thread.