From 61348c6f14ef309c9679b63e5f1617c6779b9470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C5=ABdolfs=20Agris=20Stilve?= Date: Wed, 17 Jul 2024 14:20:07 +0300 Subject: [PATCH 1/3] fix: ensure execution order of callbacks with zero-delay by handling them independently of delayed callbacks Under heavy UI load (such as real-time plotting with pyqtgraph), it was observed that the zero-delay timers scheduled via _SimpleTimer would occasionally run out-of-order on Windows. --- qasync/__init__.py | 61 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/qasync/__init__.py b/qasync/__init__.py index 5a81032..a6cf5ac 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -22,6 +22,7 @@ import time from concurrent.futures import Future from queue import Queue +from collections import deque logger = logging.getLogger(__name__) @@ -314,6 +315,58 @@ def __log_debug(self, *args, **kwargs): self._logger.debug(*args, **kwargs) +@with_logger +class _CallSoonQueue(QtCore.QObject): + def __init__(self): + super().__init__() + # Contains asyncio.Handle objects + # Use a deque instead of Queue, as we don't require + # synchronization between threads here. + self.__callbacks = deque() + # Set a 0-delay timer on itself, this will ensure that + # timerEvent gets fired each time after window events are processed + # See https://doc.qt.io/qt-6/qtimer.html#interval-prop + self.__timerId = self.startTimer(0) + self.__stopped = False + self.__debug_enabled = False + + def add_callback(self, handle): + # handle must be an asyncio.Handle + self.__callbacks.append(handle) + self.__log_debug("Registering call_soon handle %s", id(handle)) + return handle + + def timerEvent(self, event): + timerId = event.timerId() + assert timerId == self.__timerId + + # Stop timer if stopped + if self.__stopped: + self.killTimer(timerId) + self.__log_debug("call_soon queue stopped, clearing handles") + # TODO: Do we need to del the handles or somehow invalidate them? + self.__callbacks.clear() + return + + # Iterate over pending callbacks + # TODO: Runtime deadline, don't process the entire queue if it takes too long? + while len(self.__callbacks) > 0: + handle = self.__callbacks.popleft() + self.__log_debug("Calling call_soon handle %s", id(handle)) + handle._run() + + def stop(self): + self.__log_debug("Stopping call_soon queue") + self.__stopped = True + + def set_debug(self, enabled): + self.__debug_enabled = enabled + + def __log_debug(self, *args, **kwargs): + if self.__debug_enabled: + self._logger.debug(*args, **kwargs) + + def _fileno(fd): if isinstance(fd, int): return fd @@ -356,6 +409,7 @@ def __init__(self, app=None, set_running_loop=False, already_running=False): self._read_notifiers = {} self._write_notifiers = {} self._timer = _SimpleTimer() + self._call_soon_queue = _CallSoonQueue() self.__call_soon_signaller = signaller = _make_signaller(QtCore, object, tuple) self.__call_soon_signal = signaller.signal @@ -463,6 +517,7 @@ def close(self): super().close() self._timer.stop() + self._call_soon_queue.stop() self.__app = None for notifier in itertools.chain( @@ -497,6 +552,11 @@ def call_later(self, delay, callback, *args, context=None): return self._add_callback(asyncio.Handle(callback, args, self), delay) def _add_callback(self, handle, delay=0): + if delay == 0: + # To ensure that we can guarantee the execution order of + # 0-delay callbacks, add them to a special queue, rather than + # assume that Qt will fire the timerEvents in order + return self._call_soon_queue.add_callback(handle) return self._timer.add_callback(handle, delay) def call_soon(self, callback, *args, context=None): @@ -741,6 +801,7 @@ def set_debug(self, enabled): super().set_debug(enabled) self.__debug_enabled = enabled self._timer.set_debug(enabled) + self._call_soon_queue.set_debug(enabled) def __enter__(self): return self From 95f91351918eab3bae753dbc5b1b7f2e65cb469f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C5=ABdolfs=20Agris=20Stilve?= Date: Wed, 17 Jul 2024 16:30:27 +0300 Subject: [PATCH 2/3] fix: only enable call_soon timer when callbacks exist, to prevent polling induced CPU usage --- qasync/__init__.py | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/qasync/__init__.py b/qasync/__init__.py index a6cf5ac..9492d58 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -323,10 +323,11 @@ def __init__(self): # Use a deque instead of Queue, as we don't require # synchronization between threads here. self.__callbacks = deque() - # Set a 0-delay timer on itself, this will ensure that - # timerEvent gets fired each time after window events are processed - # See https://doc.qt.io/qt-6/qtimer.html#interval-prop - self.__timerId = self.startTimer(0) + + # Keep track of the current timer. + # The queue can only have a single timer that services it. + # Once fired, all pending callbacks will be processed. + self.__timer_id = None self.__stopped = False self.__debug_enabled = False @@ -334,18 +335,29 @@ def add_callback(self, handle): # handle must be an asyncio.Handle self.__callbacks.append(handle) self.__log_debug("Registering call_soon handle %s", id(handle)) + + # Create a timer if it doesn't yet exist + if self.__timer_id is None: + # Set a 0-delay timer on itself, this will ensure thats + # it gets fired immediately after window events are processed the next time. + # See https://doc.qt.io/qt-6/qtimer.html#interval-prop + self.__timer_id = self.startTimer(0) + self.__log_debug("Registering call_soon timer %s", self.__timer_id) return handle def timerEvent(self, event): timerId = event.timerId() - assert timerId == self.__timerId + # We should have only one timer active at the same time, so + # this assert will get hit only when something's very bad + assert timerId == self.__timer_id # Stop timer if stopped if self.__stopped: - self.killTimer(timerId) self.__log_debug("call_soon queue stopped, clearing handles") # TODO: Do we need to del the handles or somehow invalidate them? self.__callbacks.clear() + self.killTimer(timerId) + self.__timer_id = None return # Iterate over pending callbacks @@ -355,6 +367,15 @@ def timerEvent(self, event): self.__log_debug("Calling call_soon handle %s", id(handle)) handle._run() + # No more callbacks exist, we can dispose this timer. + # It will be recreated once a callback is registered again. + # It's should be safe to assume that another thread isn't calling + # add_callback during the lifetime of timerEvent + self.__log_debug("Stopping call_soon timer %s", timerId) + self.killTimer(timerId) + self.__timer_id = None + assert len(self.__callbacks) == 0 + def stop(self): self.__log_debug("Stopping call_soon queue") self.__stopped = True From f71f3002b65ba7856b3bb27506e9407afd854bb6 Mon Sep 17 00:00:00 2001 From: Alex March Date: Thu, 24 Jul 2025 15:12:28 +0900 Subject: [PATCH 3/3] fix: also handle slow_callback_duration in call_soon timer --- qasync/__init__.py | 111 +++++++++++++++++++++++++-------------------- 1 file changed, 61 insertions(+), 50 deletions(-) diff --git a/qasync/__init__.py b/qasync/__init__.py index 9492d58..9daeca0 100644 --- a/qasync/__init__.py +++ b/qasync/__init__.py @@ -20,9 +20,9 @@ import os import sys import time +from collections import deque from concurrent.futures import Future from queue import Queue -from collections import deque logger = logging.getLogger(__name__) @@ -235,7 +235,7 @@ def __exit__(self, *args): def _format_handle(handle: asyncio.Handle): cb = getattr(handle, "_callback", None) - if isinstance(getattr(cb, '__self__', None), asyncio.tasks.Task): + if isinstance(getattr(cb, "__self__", None), asyncio.tasks.Task): return repr(cb.__self__) return str(handle) @@ -260,60 +260,60 @@ def __init__(self): def add_callback(self, handle, delay=0): timerid = self.startTimer(int(max(0, delay) * 1000)) - self.__log_debug("Registering timer id %s", timerid) + self._logger.debug("Registering timer id %s", timerid) assert timerid not in self.__callbacks self.__callbacks[timerid] = handle return handle def timerEvent(self, event): # noqa: N802 timerid = event.timerId() - self.__log_debug("Timer event on id %s", timerid) + self._logger.debug("Timer event on id %s", timerid) if self._stopped: - self.__log_debug("Timer stopped, killing %s", timerid) + self._logger.debug("Timer stopped, killing %s", timerid) self.killTimer(timerid) del self.__callbacks[timerid] + return + try: + handle = self.__callbacks[timerid] + except KeyError as e: + self._logger.debug(e) + pass else: - try: - handle = self.__callbacks[timerid] - except KeyError as e: - self.__log_debug(e) - pass + if handle._cancelled: + self._logger.debug("Handle %s cancelled", handle) else: - if handle._cancelled: - self.__log_debug("Handle %s cancelled", handle) - else: - if self.__debug_enabled: - # This may not be the most efficient thing to do, but it removes the need to sync - # "slow_callback_duration" and "_current_handle" variables - loop = asyncio.get_event_loop() - try: - loop._current_handle = handle - self._logger.debug("Calling handle %s", handle) - t0 = time.time() - handle._run() - dt = time.time() - t0 - if dt >= loop.slow_callback_duration: - self._logger.warning('Executing %s took %.3f seconds', _format_handle(handle), dt) - finally: - loop._current_handle = None - else: + if self.__debug_enabled: + # This may not be the most efficient thing to do, but it removes the need to sync + # "slow_callback_duration" and "_current_handle" variables + loop = asyncio.get_event_loop() + try: + loop._current_handle = handle + self._logger.debug("Calling handle %s", handle) + t0 = time.time() handle._run() - finally: - del self.__callbacks[timerid] - handle = None - self.killTimer(timerid) + dt = time.time() - t0 + if dt >= loop.slow_callback_duration: + self._logger.warning( + "Executing %s took %.3f seconds", + _format_handle(handle), + dt, + ) + finally: + loop._current_handle = None + else: + handle._run() + finally: + del self.__callbacks[timerid] + handle = None + self.killTimer(timerid) def stop(self): - self.__log_debug("Stopping timers") + self._logger.debug("Stopping timers") self._stopped = True def set_debug(self, enabled): self.__debug_enabled = enabled - def __log_debug(self, *args, **kwargs): - if self.__debug_enabled: - self._logger.debug(*args, **kwargs) - @with_logger class _CallSoonQueue(QtCore.QObject): @@ -334,7 +334,7 @@ def __init__(self): def add_callback(self, handle): # handle must be an asyncio.Handle self.__callbacks.append(handle) - self.__log_debug("Registering call_soon handle %s", id(handle)) + self._logger.debug("Registering call_soon handle %s", id(handle)) # Create a timer if it doesn't yet exist if self.__timer_id is None: @@ -342,7 +342,7 @@ def add_callback(self, handle): # it gets fired immediately after window events are processed the next time. # See https://doc.qt.io/qt-6/qtimer.html#interval-prop self.__timer_id = self.startTimer(0) - self.__log_debug("Registering call_soon timer %s", self.__timer_id) + self._logger.debug("Registering call_soon timer %s", self.__timer_id) return handle def timerEvent(self, event): @@ -353,8 +353,7 @@ def timerEvent(self, event): # Stop timer if stopped if self.__stopped: - self.__log_debug("call_soon queue stopped, clearing handles") - # TODO: Do we need to del the handles or somehow invalidate them? + self._logger.debug("call_soon queue stopped, clearing handles") self.__callbacks.clear() self.killTimer(timerId) self.__timer_id = None @@ -364,29 +363,41 @@ def timerEvent(self, event): # TODO: Runtime deadline, don't process the entire queue if it takes too long? while len(self.__callbacks) > 0: handle = self.__callbacks.popleft() - self.__log_debug("Calling call_soon handle %s", id(handle)) - handle._run() + if self.__debug_enabled: + # This may not be the most efficient thing to do, but it removes the need to sync + # "slow_callback_duration" and "_current_handle" variables + loop = asyncio.get_event_loop() + try: + loop._current_handle = handle + self._logger.debug("Calling handle %s", handle) + t0 = time.time() + handle._run() + dt = time.time() - t0 + if dt >= loop.slow_callback_duration: + self._logger.warning( + "Executing %s took %.3f seconds", _format_handle(handle), dt + ) + finally: + loop._current_handle = None + else: + handle._run() # No more callbacks exist, we can dispose this timer. # It will be recreated once a callback is registered again. # It's should be safe to assume that another thread isn't calling # add_callback during the lifetime of timerEvent - self.__log_debug("Stopping call_soon timer %s", timerId) + self._logger.debug("Stopping call_soon timer %s", timerId) self.killTimer(timerId) self.__timer_id = None assert len(self.__callbacks) == 0 def stop(self): - self.__log_debug("Stopping call_soon queue") + self._logger.debug("Stopping call_soon queue") self.__stopped = True def set_debug(self, enabled): self.__debug_enabled = enabled - def __log_debug(self, *args, **kwargs): - if self.__debug_enabled: - self._logger.debug(*args, **kwargs) - def _fileno(fd): if isinstance(fd, int): @@ -412,7 +423,7 @@ class _QEventLoop: ... await asyncio.sleep(.1) >>> >>> asyncio.run(xplusy(2, 2), loop_factory=lambda:QEventLoop(app)) - + If the event loop shall be used with an existing and already running QApplication it must be specified in the constructor via already_running=True In this case the user is responsible for loop cleanup with stop() and close()