diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5f354b4..88aeb9b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,9 +1,20 @@ UNRELEASED ---------- -* Added official support for Python 3.13. -* Dropped support for EOL Python 3.8. -* Dropped support for EOL PySide 2. +- Added official support for Python 3.13. +- Dropped support for EOL Python 3.8. +- Dropped support for EOL PySide 2. +- Fixed PySide6 exceptions / warnings about being unable to disconnect signals + with ``qtbot.waitSignal`` (`#552`_, `#558`_). +- Reduced the likelyhood of trouble when using ``qtbot.waitSignal(s)`` and + ``qtbot.waitCallback`` where the signal/callback is emitted from a non-main + thread. In theory, more problems remain and this isn't a proper fix yet. In + practice, it seems impossible to provoke any problems in pytest-qt's testsuite. + (`#586`_) + +.. _#552: https://github.com/pytest-dev/pytest-qt/issues/552 +.. _#558: https://github.com/pytest-dev/pytest-qt/issues/558 +.. _#586: https://github.com/pytest-dev/pytest-qt/issues/586 4.4.0 (2024-02-07) ------------------ diff --git a/docs/wait_callback.rst b/docs/wait_callback.rst index 26f27bf..72a7f52 100644 --- a/docs/wait_callback.rst +++ b/docs/wait_callback.rst @@ -26,7 +26,7 @@ Anything following the ``with`` block will be run only after the callback has be If the callback doesn't get called during the given timeout, :class:`qtbot.TimeoutError ` is raised. If it is called more than once, -:class:`qtbot.CallbackCalledTwiceError ` is raised. +:class:`qtbot.CallbackCalledTwiceError ` is raised. raising parameter ----------------- diff --git a/pytest.ini b/pytest.ini index 9ade678..26b0a16 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,5 +2,4 @@ testpaths = tests addopts = --strict-markers --strict-config xfail_strict = true -markers = - filterwarnings: pytest's filterwarnings marker +filterwarnings = error diff --git a/src/pytestqt/exceptions.py b/src/pytestqt/exceptions.py index d342876..a02567a 100644 --- a/src/pytestqt/exceptions.py +++ b/src/pytestqt/exceptions.py @@ -116,3 +116,25 @@ class ScreenshotError(Exception): Access via ``qtbot.ScreenshotError``. """ + + +class SignalEmittedError(Exception): + """ + .. versionadded:: 1.11 + + The exception thrown by :meth:`pytestqt.qtbot.QtBot.assertNotEmitted` if a + signal was emitted unexpectedly. + + Access via ``qtbot.SignalEmittedError``. + """ + + +class CallbackCalledTwiceError(Exception): + """ + .. versionadded:: 3.1 + + The exception thrown by :meth:`pytestqt.qtbot.QtBot.waitCallback` if a + callback was called twice. + + Access via ``qtbot.CallbackCalledTwiceError``. + """ diff --git a/src/pytestqt/qtbot.py b/src/pytestqt/qtbot.py index 321b2a3..52cb751 100644 --- a/src/pytestqt/qtbot.py +++ b/src/pytestqt/qtbot.py @@ -2,16 +2,14 @@ import weakref import warnings -from pytestqt.exceptions import TimeoutError, ScreenshotError -from pytestqt.qt_compat import qt_api -from pytestqt.wait_signal import ( - SignalBlocker, - MultiSignalBlocker, - SignalEmittedSpy, +from pytestqt.exceptions import ( + TimeoutError, + ScreenshotError, SignalEmittedError, - CallbackBlocker, CallbackCalledTwiceError, ) +from pytestqt.qt_compat import qt_api +from pytestqt import wait_signal def _parse_ini_boolean(value): @@ -350,7 +348,7 @@ def waitSignal(self, signal, *, timeout=5000, raising=None, check_params_cb=None f"Passing None as signal isn't supported anymore, use qtbot.wait({timeout}) instead." ) raising = self._should_raise(raising) - blocker = SignalBlocker( + blocker = wait_signal.SignalBlocker( timeout=timeout, raising=raising, check_params_cb=check_params_cb ) blocker.connect(signal) @@ -437,7 +435,7 @@ def waitSignals( len(check_params_cbs), len(signals) ) ) - blocker = MultiSignalBlocker( + blocker = wait_signal.MultiSignalBlocker( timeout=timeout, raising=raising, order=order, @@ -455,7 +453,7 @@ def wait(self, ms): While waiting, events will be processed and your test will stay responsive to user interface events or network communication. """ - blocker = MultiSignalBlocker(timeout=ms, raising=False) + blocker = wait_signal.MultiSignalBlocker(timeout=ms, raising=False) blocker.wait() @contextlib.contextmanager @@ -475,7 +473,7 @@ def assertNotEmitted(self, signal, *, wait=0): .. note:: This method is also available as ``assert_not_emitted`` (pep-8 alias) """ - spy = SignalEmittedSpy(signal) + spy = wait_signal.SignalEmittedSpy(signal) with spy, self.waitSignal(signal, timeout=wait, raising=False): yield spy.assert_not_emitted() @@ -589,7 +587,7 @@ def waitCallback(self, *, timeout=5000, raising=None): .. note:: This method is also available as ``wait_callback`` (pep-8 alias) """ raising = self._should_raise(raising) - blocker = CallbackBlocker(timeout=timeout, raising=raising) + blocker = wait_signal.CallbackBlocker(timeout=timeout, raising=raising) return blocker @contextlib.contextmanager diff --git a/src/pytestqt/utils.py b/src/pytestqt/utils.py index 79703c0..d4949e0 100644 --- a/src/pytestqt/utils.py +++ b/src/pytestqt/utils.py @@ -1,3 +1,7 @@ +import dataclasses +from typing import Any + + def get_marker(item, name): """Get a marker from a pytest item. @@ -9,3 +13,19 @@ def get_marker(item, name): except AttributeError: # pytest < 3.6 return item.get_marker(name) + + +@dataclasses.dataclass +class SignalAndArgs: + signal_name: str + args: list[Any] + + def __str__(self) -> str: + args = repr(self.args) if self.args else "" + + # remove signal parameter signature, e.g. turn "some_signal(str,int)" to "some_signal", because we're adding + # the actual parameters anyways + signal_name = self.signal_name + signal_name = signal_name.partition("(")[0] + + return signal_name + args diff --git a/src/pytestqt/wait_signal.py b/src/pytestqt/wait_signal.py index 8da3836..d5b3c9a 100644 --- a/src/pytestqt/wait_signal.py +++ b/src/pytestqt/wait_signal.py @@ -1,583 +1,31 @@ -import functools +from typing import TYPE_CHECKING -from pytestqt.exceptions import TimeoutError -from pytestqt.qt_compat import qt_api +from pytestqt.exceptions import SignalEmittedError +if TYPE_CHECKING: + from pytestqt.wait_signal_impl import ( + SignalBlocker as SignalBlocker, + MultiSignalBlocker as MultiSignalBlocker, + CallbackBlocker as CallbackBlocker, + ) -class _AbstractSignalBlocker: - """ - Base class for :class:`SignalBlocker` and :class:`MultiSignalBlocker`. - - Provides :meth:`wait` and a context manager protocol, but no means to add - new signals and to detect when the signals should be considered "done". - This needs to be implemented by subclasses. - - Subclasses also need to provide ``self._signals`` which should evaluate to - ``False`` if no signals were configured. - - """ - - def __init__(self, timeout=5000, raising=True): - self._loop = qt_api.QtCore.QEventLoop() - self.timeout = timeout - self.signal_triggered = False - self.raising = raising - self._signals = None # will be initialized by inheriting implementations - self._timeout_message = "" - if timeout is None or timeout == 0: - self._timer = None - else: - self._timer = qt_api.QtCore.QTimer(self._loop) - self._timer.setSingleShot(True) - self._timer.setInterval(timeout) - - def wait(self): - """ - Waits until either a connected signal is triggered or timeout is reached. - - :raise ValueError: if no signals are connected and timeout is None; in - this case it would wait forever. - """ - __tracebackhide__ = True - if self.signal_triggered: - return - if self.timeout is None and not self._signals: - raise ValueError("No signals or timeout specified.") - if self._timer is not None: - self._timer.timeout.connect(self._quit_loop_by_timeout) - self._timer.start() - - if self.timeout != 0: - qt_api.exec(self._loop) - - if not self.signal_triggered and self.raising: - raise TimeoutError(self._timeout_message) - - def _quit_loop_by_timeout(self): - try: - self._cleanup() - finally: - self._loop.quit() - - def _cleanup(self): - # store timeout message before the data to construct it is lost - self._timeout_message = self._get_timeout_error_message() - if self._timer is not None: - _silent_disconnect(self._timer.timeout, self._quit_loop_by_timeout) - self._timer.stop() - self._timer = None - - def _get_timeout_error_message(self): - """Subclasses have to implement this, returning an appropriate error message for a TimeoutError.""" - raise NotImplementedError # pragma: no cover - - def _extract_pyqt_signal_name(self, potential_pyqt_signal): - signal_name = potential_pyqt_signal.signal # type: str - if not isinstance(signal_name, str): - raise TypeError( - "Invalid 'signal' attribute in {}. " - "Expected str but got {}".format(signal_name, type(signal_name)) - ) - # strip magic number "2" that PyQt prepends to the signal names - signal_name = signal_name.lstrip("2") - return signal_name - - def _extract_signal_from_signal_tuple(self, potential_signal_tuple): - if isinstance(potential_signal_tuple, tuple): - if len(potential_signal_tuple) != 2: - raise ValueError( - "Signal tuple must have length of 2 (first element is the signal, " - "the second element is the signal's name)." - ) - signal_tuple = potential_signal_tuple - signal_name = signal_tuple[1] - if not isinstance(signal_name, str): - raise TypeError( - "Invalid type for provided signal name, " - "expected str but got {}".format(type(signal_name)) - ) - if not signal_name: - raise ValueError("The provided signal name may not be empty") - return signal_name - return "" - - def determine_signal_name(self, potential_signal_tuple): - """ - Attempts to determine the signal's name. If the user provided the signal name as 2nd value of the tuple, this - name has preference. Bad values cause a ``ValueError``. - Otherwise it attempts to get the signal from the ``signal`` attribute of ``signal`` (which only exists for - PyQt signals). - :returns: str name of the signal, an empty string if no signal name can be determined, or raises an error - in case the user provided an invalid signal name manually - """ - signal_name = self._extract_signal_from_signal_tuple(potential_signal_tuple) - - if not signal_name: - try: - signal_name = self._extract_pyqt_signal_name(potential_signal_tuple) - except AttributeError: - # not a PyQt signal - # -> no signal name could be determined - signal_name = "" - - return signal_name - - def get_callback_name(self, callback): - """Attempts to extract the name of the callback. Returns empty string in case of failure.""" - try: - name = callback.__name__ - except AttributeError: - try: - name = ( - callback.func.__name__ - ) # e.g. for callbacks wrapped with functools.partial() - except AttributeError: - name = "" - return name - - @staticmethod - def get_signal_from_potential_signal_tuple(signal_tuple): - if isinstance(signal_tuple, tuple): - return signal_tuple[0] - return signal_tuple - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - __tracebackhide__ = True - if value is None: - # only wait if no exception happened inside the "with" block - self.wait() - - -class SignalBlocker(_AbstractSignalBlocker): - """ - Returned by :meth:`pytestqt.qtbot.QtBot.waitSignal` method. - - :ivar int timeout: maximum time to wait for a signal to be triggered. Can - be changed before :meth:`wait` is called. - - :ivar bool signal_triggered: set to ``True`` if a signal (or all signals in - case of :class:`MultipleSignalBlocker`) was triggered, or - ``False`` if timeout was reached instead. Until :meth:`wait` is called, - this is set to ``None``. - - :ivar bool raising: - If :class:`qtbot.TimeoutError ` should be raised - if a timeout occurred. - - .. note:: contrary to the parameter of same name in - :meth:`pytestqt.qtbot.QtBot.waitSignal`, this parameter does not - consider the :ref:`qt_default_raising` option. - - :ivar list args: - The arguments which were emitted by the signal, or None if the signal - wasn't emitted at all. - - .. versionadded:: 1.10 - The *args* attribute. - - .. automethod:: wait - .. automethod:: connect - """ - - def __init__(self, timeout=5000, raising=True, check_params_cb=None): - super().__init__(timeout, raising=raising) - self._signals = [] - self.args = None - self.all_args = [] - self.check_params_callback = check_params_cb - self.signal_name = "" - - def connect(self, signal): - """ - Connects to the given signal, making :meth:`wait()` return once - this signal is emitted. - - More than one signal can be connected, in which case **any** one of - them will make ``wait()`` return. - - :param signal: QtCore.Signal or tuple (QtCore.Signal, str) - """ - self.signal_name = self.determine_signal_name(potential_signal_tuple=signal) - actual_signal = self.get_signal_from_potential_signal_tuple(signal) - actual_signal.connect(self._quit_loop_by_signal) - self._signals.append(actual_signal) - - def _quit_loop_by_signal(self, *args): - """ - quits the event loop and marks that we finished because of a signal. - """ - if self.check_params_callback: - self.all_args.append(args) - if not self.check_params_callback(*args): - return # parameter check did not pass - try: - self.signal_triggered = True - self.args = list(args) - self._cleanup() - finally: - self._loop.quit() - - def _cleanup(self): - super()._cleanup() - for signal in self._signals: - _silent_disconnect(signal, self._quit_loop_by_signal) - self._signals = [] - - def get_params_as_str(self): - if not self.all_args: - return "" - - if len(self.all_args[0]) == 1: - # we have a list of tuples with 1 element each (i.e. the signal has 1 parameter), it doesn't make sense - # to return something like "[(someParam,), (someParam,)]", it's just ugly. Instead return something like - # "[someParam, someParam]" - args_list = [arg[0] for arg in self.all_args] - else: - args_list = self.all_args - - return str(args_list) - - def _get_timeout_error_message(self): - if self.check_params_callback is not None: - return ( - "Signal {signal_name} emitted with parameters {params} " - "within {timeout} ms, but did not satisfy " - "the {cb_name} callback" - ).format( - signal_name=self.signal_name, - params=self.get_params_as_str(), - timeout=self.timeout, - cb_name=self.get_callback_name(self.check_params_callback), - ) - else: - return "Signal {signal_name} not emitted after {timeout} ms".format( - signal_name=self.signal_name, timeout=self.timeout - ) - - -class SignalAndArgs: - def __init__(self, signal_name, args): - self.signal_name = signal_name - self.args = args - - def _get_readable_signal_with_optional_args(self): - args = repr(self.args) if self.args else "" - - # remove signal parameter signature, e.g. turn "some_signal(str,int)" to "some_signal", because we're adding - # the actual parameters anyways - signal_name = self.signal_name - signal_name = signal_name.partition("(")[0] - - return signal_name + args - - def __str__(self): - return self._get_readable_signal_with_optional_args() - - def __eq__(self, other): - if isinstance(other, self.__class__): - return self.__dict__ == other.__dict__ - else: - return False - - -# Returns e.g. "3rd" for 3, or "21st" for 21 -def get_ordinal_str(n): - return "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th")) - - -class NoMatchingIndexFoundError(Exception): - pass - - -class MultiSignalBlocker(_AbstractSignalBlocker): - """ - Returned by :meth:`pytestqt.qtbot.QtBot.waitSignals` method, blocks until - all signals connected to it are triggered or the timeout is reached. - - Variables identical to :class:`SignalBlocker`: - - ``timeout`` - - ``signal_triggered`` - - ``raising`` - - .. automethod:: wait - """ - - def __init__(self, timeout=5000, raising=True, check_params_cbs=None, order="none"): - super().__init__(timeout, raising=raising) - self._order = order - self._check_params_callbacks = check_params_cbs - self._signals_emitted = ( - [] - ) # list of booleans, indicates whether the signal was already emitted - self._signals_map = ( - {} - ) # maps from a unique Signal to a list of indices where to expect signal instance emits - self._signals = ( - [] - ) # list of all Signals (for compatibility with _AbstractSignalBlocker) - self._slots = [] # list of slot functions - self._signal_expected_index = 0 # only used when forcing order - self._strict_order_violated = False - self._actual_signal_and_args_at_violation = None - self._signal_names = ( - {} - ) # maps from the unique Signal to the name of the signal (as string) - self.all_signals_and_args = [] # list of SignalAndArgs instances - - def add_signals(self, signals): - """ - Adds the given signal to the list of signals which :meth:`wait()` waits - for. - - :param list signals: list of QtCore.Signal`s or tuples (QtCore.Signal, str) - """ - self._determine_unique_signals(signals) - self._create_signal_emitted_indices(signals) - self._connect_unique_signals() - def _get_timeout_error_message(self): - if not self._are_signal_names_available(): - error_message = self._get_degenerate_error_message() - else: - error_message = self._get_expected_and_actual_signals_message() - if self._strict_order_violated: - error_message = self._get_order_violation_message() + error_message +def __getattr__(name: str) -> type: + """Avoid importing wait_signal_impl at the top level as qt_api is uninitialized.""" + from pytestqt.wait_signal_impl import ( + SignalBlocker, + MultiSignalBlocker, + CallbackBlocker, + ) - return error_message - - def _determine_unique_signals(self, signals): - # create a map that maps from a unique signal to a list of indices - # (positions) where this signal is expected (in case order matters) - signals_as_str = [ - str(self.get_signal_from_potential_signal_tuple(signal)) - for signal in signals - ] - # maps from a signal-string to one of the signal instances (the first one found) - signal_str_to_unique_signal = {} - for index, signal_str in enumerate(signals_as_str): - signal = self.get_signal_from_potential_signal_tuple(signals[index]) - potential_tuple = signals[index] - if signal_str not in signal_str_to_unique_signal: - unique_signal_tuple = potential_tuple - signal_str_to_unique_signal[signal_str] = signal - self._signals_map[signal] = [index] # create a new list - else: - # append to existing list - unique_signal = signal_str_to_unique_signal[signal_str] - self._signals_map[unique_signal].append(index) - unique_signal_tuple = signals[index] - - self._determine_and_save_signal_name(unique_signal_tuple) - - def _determine_and_save_signal_name(self, unique_signal_tuple): - signal_name = self.determine_signal_name(unique_signal_tuple) - if signal_name: # might be an empty string if no name could be determined - unique_signal = self.get_signal_from_potential_signal_tuple( - unique_signal_tuple - ) - self._signal_names[unique_signal] = signal_name - - def _create_signal_emitted_indices(self, signals): - for signal in signals: - self._signals_emitted.append(False) - - def _connect_unique_signals(self): - for unique_signal in self._signals_map: - slot = functools.partial(self._unique_signal_emitted, unique_signal) - self._slots.append(slot) - unique_signal.connect(slot) - self._signals.append(unique_signal) - - def _unique_signal_emitted(self, unique_signal, *args): - """ - Called when a given signal is emitted. - - If all expected signals have been emitted, quits the event loop and - marks that we finished because signals. - """ - self._record_emitted_signal_if_possible(unique_signal, *args) - - self._check_signal_match(unique_signal, *args) - - if self._all_signals_emitted(): - self.signal_triggered = True - try: - self._cleanup() - finally: - self._loop.quit() - - def _record_emitted_signal_if_possible(self, unique_signal, *args): - if self._are_signal_names_available(): - self.all_signals_and_args.append( - SignalAndArgs(signal_name=self._signal_names[unique_signal], args=args) - ) - - def _check_signal_match(self, unique_signal, *args): - if self._order == "none": - # perform the test for every matching index (stop after the first one that matches) - try: - successful_index = self._get_first_matching_index(unique_signal, *args) - self._signals_emitted[successful_index] = True - except NoMatchingIndexFoundError: # none found - pass - elif self._order == "simple": - if self._check_signal_matches_expected_index(unique_signal, *args): - self._signals_emitted[self._signal_expected_index] = True - self._signal_expected_index += 1 - else: # self.order == "strict" - if not self._strict_order_violated: - # only do the check if the strict order has not been violated yet - self._strict_order_violated = ( - True # assume the order has been violated this time - ) - if self._check_signal_matches_expected_index(unique_signal, *args): - self._signals_emitted[self._signal_expected_index] = True - self._signal_expected_index += 1 - self._strict_order_violated = ( - False # order has not been violated after all! - ) - else: - if self._are_signal_names_available(): - self._actual_signal_and_args_at_violation = SignalAndArgs( - signal_name=self._signal_names[unique_signal], args=args - ) - - def _all_signals_emitted(self): - return not self._strict_order_violated and all(self._signals_emitted) - - def _get_first_matching_index(self, unique_signal, *args): - successfully_emitted = False - successful_index = -1 - potential_indices = self._get_unemitted_signal_indices(unique_signal) - for potential_index in potential_indices: - if not self._violates_callback_at_index(potential_index, *args): - successful_index = potential_index - successfully_emitted = True - break - if not successfully_emitted: - raise NoMatchingIndexFoundError - - return successful_index - - def _check_signal_matches_expected_index(self, unique_signal, *args): - potential_indices = self._get_unemitted_signal_indices(unique_signal) - if potential_indices: - if self._signal_expected_index == potential_indices[0]: - if not self._violates_callback_at_index( - self._signal_expected_index, *args - ): - return True - return False - - def _violates_callback_at_index(self, index, *args): - """ - Checks if there's a callback at the provided index that is violates due to invalid parameters. Returns False if - there is no callback for that index, or if a callback exists but it wasn't violated (returned True). - Returns True otherwise. - """ - if self._check_params_callbacks: - callback_func = self._check_params_callbacks[index] - if callback_func: - if not callback_func(*args): - return True - return False - - def _get_unemitted_signal_indices(self, signal): - """Returns the indices for the provided signal for which NO signal instance has been emitted yet.""" - return [ - index - for index in self._signals_map[signal] - if not self._signals_emitted[index] - ] - - def _are_signal_names_available(self): - if self._signal_names: - return True - return False - - def _get_degenerate_error_message(self): - received_signals = sum(self._signals_emitted) - total_signals = len(self._signals_emitted) - return ( - "Received {actual} of the {total} expected signals. " - "To improve this error message, provide the names of the signals " - "in the waitSignals() call." - ).format(actual=received_signals, total=total_signals) - - def _get_expected_and_actual_signals_message(self): - if not self.all_signals_and_args: - emitted_signals = "None" - else: - emitted_signal_string_list = [str(_) for _ in self.all_signals_and_args] - emitted_signals = self._format_as_array(emitted_signal_string_list) - - missing_signal_strings = [] - for missing_signal_index in self._get_missing_signal_indices(): - missing_signal_strings.append( - self._get_signal_string_representation_for_index(missing_signal_index) - ) - missing_signals = self._format_as_array(missing_signal_strings) - - return "Emitted signals: {}. Missing: {}".format( - emitted_signals, missing_signals - ) - - @staticmethod - def _format_as_array(list_of_strings): - return "[{}]".format(", ".join(list_of_strings)) - - def _get_order_violation_message(self): - expected_signal_as_str = self._get_signal_string_representation_for_index( - self._signal_expected_index - ) - actual_signal_as_str = str(self._actual_signal_and_args_at_violation) - return ( - "Signal order violated! Expected {expected} as {ordinal} signal, " - "but received {actual} instead. " - ).format( - expected=expected_signal_as_str, - ordinal=get_ordinal_str(self._signal_expected_index + 1), - actual=actual_signal_as_str, - ) - - def _get_missing_signal_indices(self): - return [ - index - for index, value in enumerate(self._signals_emitted) - if not self._signals_emitted[index] - ] - - def _get_signal_string_representation_for_index(self, index): - """Returns something like or (callback: )""" - signal = self._get_signal_for_index(index) - signal_str_repr = self._signal_names[signal] - - if self._check_params_callbacks: - potential_callback = self._check_params_callbacks[index] - if potential_callback: - callback_name = self.get_callback_name(potential_callback) - if callback_name: - signal_str_repr += f" (callback: {callback_name})" - - return signal_str_repr - - def _get_signal_for_index(self, index): - for signal in self._signals_map: - if index in self._signals_map[signal]: - return signal - - def _cleanup(self): - super()._cleanup() - for i in range(len(self._signals)): - signal = self._signals[i] - slot = self._slots[i] - _silent_disconnect(signal, slot) - del self._signals_emitted[:] - self._signals_map.clear() - del self._slots[:] + if name == "SignalBlocker": + return SignalBlocker + elif name == "MultiSignalBlocker": + return MultiSignalBlocker + elif name == "CallbackBlocker": + return CallbackBlocker + else: + raise AttributeError(f"module {__name__} has no attribute {name}") class SignalEmittedSpy: @@ -613,131 +61,3 @@ def assert_not_emitted(self): ) else: raise SignalEmittedError(f"Signal {self.signal!r} unexpectedly emitted") - - -class CallbackBlocker: - """ - .. versionadded:: 3.1 - - An object which checks if the returned callback gets called. - - Intended to be used as a context manager. - - :ivar int timeout: maximum time to wait for the callback to be called. - - :ivar bool raising: - If :class:`qtbot.TimeoutError ` should be raised if - a timeout occurred. - - .. note:: contrary to the parameter of same name in - :meth:`pytestqt.qtbot.QtBot.waitCallback`, this parameter does not - consider the :ref:`qt_default_raising` option. - - :ivar list args: - The arguments with which the callback was called, or None if the - callback wasn't called at all. - - :ivar dict kwargs: - The keyword arguments with which the callback was called, or None if - the callback wasn't called at all. - """ - - def __init__(self, timeout=5000, raising=True): - self.timeout = timeout - self.raising = raising - self.args = None - self.kwargs = None - self.called = False - self._loop = qt_api.QtCore.QEventLoop() - if timeout is None: - self._timer = None - else: - self._timer = qt_api.QtCore.QTimer(self._loop) - self._timer.setSingleShot(True) - self._timer.setInterval(timeout) - - def wait(self): - """ - Waits until either the returned callback is called or timeout is - reached. - """ - __tracebackhide__ = True - if self.called: - return - if self._timer is not None: - self._timer.timeout.connect(self._quit_loop_by_timeout) - self._timer.start() - qt_api.exec(self._loop) - if not self.called and self.raising: - raise TimeoutError("Callback wasn't called after %sms." % self.timeout) - - def assert_called_with(self, *args, **kwargs): - """ - Check that the callback was called with the same arguments as this - function. - """ - assert self.called - assert self.args == list(args) - assert self.kwargs == kwargs - - def _quit_loop_by_timeout(self): - try: - self._cleanup() - finally: - self._loop.quit() - - def _cleanup(self): - if self._timer is not None: - _silent_disconnect(self._timer.timeout, self._quit_loop_by_timeout) - self._timer.stop() - self._timer = None - - def __call__(self, *args, **kwargs): - # Not inside the try: block, as if self.called is True, we did quit the - # loop already. - if self.called: - raise CallbackCalledTwiceError("Callback called twice") - try: - self.args = list(args) - self.kwargs = kwargs - self.called = True - self._cleanup() - finally: - self._loop.quit() - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - __tracebackhide__ = True - if value is None: - # only wait if no exception happened inside the "with" block - self.wait() - - -class SignalEmittedError(Exception): - """ - .. versionadded:: 1.11 - - The exception thrown by :meth:`pytestqt.qtbot.QtBot.assertNotEmitted` if a - signal was emitted unexpectedly. - """ - - -class CallbackCalledTwiceError(Exception): - """ - .. versionadded:: 3.1 - - The exception thrown by :meth:`pytestqt.qtbot.QtBot.waitCallback` if a - callback was called twice. - """ - - -def _silent_disconnect(signal, slot): - """Disconnects a signal from a slot, ignoring errors. Sometimes - Qt might disconnect a signal automatically for unknown reasons. - """ - try: - signal.disconnect(slot) - except (TypeError, RuntimeError): # pragma: no cover - pass diff --git a/src/pytestqt/wait_signal_impl.py b/src/pytestqt/wait_signal_impl.py new file mode 100644 index 0000000..1b59161 --- /dev/null +++ b/src/pytestqt/wait_signal_impl.py @@ -0,0 +1,680 @@ +import functools + +from pytestqt.exceptions import TimeoutError, CallbackCalledTwiceError +from pytestqt.utils import SignalAndArgs +from pytestqt.qt_compat import qt_api + + +if not hasattr(qt_api, "QtCore"): + raise ImportError( + "wait_signal_impl.py got imported too early (before qt_api is initialized)! " + "To access the [...]Blocker classes for type annotations, " + "guard the imports with `if TYPE_CHECKING:`. For any other usage, " + "use `from pytestqt import wait_signal` and access `wait_signal.[...]Blocker`." + ) + + +def get_ordinal_str(n: int) -> str: + """Return e.g. "3rd" for 3, or "21st" for 21.""" + return "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th")) + + +class NoMatchingIndexFoundError(Exception): + pass + + +def _silent_disconnect(signal, slot): + """Disconnects a signal from a slot, ignoring errors. Sometimes + Qt might disconnect a signal automatically for unknown reasons. + """ + try: + signal.disconnect(slot) + except (TypeError, RuntimeError): # pragma: no cover + pass + + +class _AbstractSignalBlocker(qt_api.QtCore.QObject): + """ + Base class for :class:`SignalBlocker` and :class:`MultiSignalBlocker`. + + Provides :meth:`wait` and a context manager protocol, but no means to add + new signals and to detect when the signals should be considered "done". + This needs to be implemented by subclasses. + + Subclasses also need to provide ``self._signals`` which should evaluate to + ``False`` if no signals were configured. + + """ + + def __init__(self, timeout=5000, raising=True): + super().__init__() + self._loop = qt_api.QtCore.QEventLoop() + self.timeout = timeout + self.signal_triggered = False + self.raising = raising + self._signals = None # will be initialized by inheriting implementations + self._timeout_message = "" + + self._timer = qt_api.QtCore.QTimer(self) + self._timer.setSingleShot(True) + if timeout is not None: + self._timer.setInterval(timeout) + self._timer.timeout.connect(self._quit_loop_by_timeout) + + def wait(self): + """ + Waits until either a connected signal is triggered or timeout is reached. + + :raise ValueError: if no signals are connected and timeout is None; in + this case it would wait forever. + """ + __tracebackhide__ = True + if self.signal_triggered: + return + if self.timeout is None and not self._signals: + raise ValueError("No signals or timeout specified.") + + if self.timeout != 0: + if self.timeout is not None: + # asserts as a stop-gap for possible multithreading issues + assert not self.signal_triggered + self._timer.start() + assert not self.signal_triggered + qt_api.exec(self._loop) + + if not self.signal_triggered and self.raising: + raise TimeoutError(self._timeout_message) + + @qt_api.Slot() + def _quit_loop_by_timeout(self): + try: + self._cleanup() + finally: + self._loop.quit() + + def _cleanup(self): + # assert self._timer.thread() == qt_api.QtCore.QThread.currentThread() + # store timeout message before the data to construct it is lost + self._timeout_message = self._get_timeout_error_message() + self._timer.stop() + + def _get_timeout_error_message(self): + """Subclasses have to implement this, returning an appropriate error message for a TimeoutError.""" + raise NotImplementedError # pragma: no cover + + def _extract_pyqt_signal_name(self, potential_pyqt_signal): + signal_name = potential_pyqt_signal.signal # type: str + if not isinstance(signal_name, str): + raise TypeError( + "Invalid 'signal' attribute in {}. Expected str but got {}".format( + signal_name, type(signal_name) + ) + ) + # strip magic number "2" that PyQt prepends to the signal names + signal_name = signal_name.lstrip("2") + return signal_name + + def _extract_signal_from_signal_tuple(self, potential_signal_tuple): + if isinstance(potential_signal_tuple, tuple): + if len(potential_signal_tuple) != 2: + raise ValueError( + "Signal tuple must have length of 2 (first element is the signal, " + "the second element is the signal's name)." + ) + signal_tuple = potential_signal_tuple + signal_name = signal_tuple[1] + if not isinstance(signal_name, str): + raise TypeError( + "Invalid type for provided signal name, " + "expected str but got {}".format(type(signal_name)) + ) + if not signal_name: + raise ValueError("The provided signal name may not be empty") + return signal_name + return "" + + def determine_signal_name(self, potential_signal_tuple): + """ + Attempts to determine the signal's name. If the user provided the signal name as 2nd value of the tuple, this + name has preference. Bad values cause a ``ValueError``. + Otherwise it attempts to get the signal from the ``signal`` attribute of ``signal`` (which only exists for + PyQt signals). + :returns: str name of the signal, an empty string if no signal name can be determined, or raises an error + in case the user provided an invalid signal name manually + """ + signal_name = self._extract_signal_from_signal_tuple(potential_signal_tuple) + + if not signal_name: + try: + signal_name = self._extract_pyqt_signal_name(potential_signal_tuple) + except AttributeError: + # not a PyQt signal + # -> no signal name could be determined + signal_name = "" + + return signal_name + + def get_callback_name(self, callback): + """Attempts to extract the name of the callback. Returns empty string in case of failure.""" + try: + name = callback.__name__ + except AttributeError: + try: + name = ( + callback.func.__name__ + ) # e.g. for callbacks wrapped with functools.partial() + except AttributeError: + name = "" + return name + + @staticmethod + def get_signal_from_potential_signal_tuple(signal_tuple): + if isinstance(signal_tuple, tuple): + return signal_tuple[0] + return signal_tuple + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + __tracebackhide__ = True + if value is None: + # only wait if no exception happened inside the "with" block + self.wait() + + +class SignalBlocker(_AbstractSignalBlocker): + """ + Returned by :meth:`pytestqt.qtbot.QtBot.waitSignal` method. + + :ivar int timeout: maximum time to wait for a signal to be triggered. Can + be changed before :meth:`wait` is called. + + :ivar bool signal_triggered: set to ``True`` if a signal (or all signals in + case of :class:`MultipleSignalBlocker`) was triggered, or + ``False`` if timeout was reached instead. Until :meth:`wait` is called, + this is set to ``None``. + + :ivar bool raising: + If :class:`qtbot.TimeoutError ` should be raised + if a timeout occurred. + + .. note:: contrary to the parameter of same name in + :meth:`pytestqt.qtbot.QtBot.waitSignal`, this parameter does not + consider the :ref:`qt_default_raising` option. + + :ivar list args: + The arguments which were emitted by the signal, or None if the signal + wasn't emitted at all. + + .. versionadded:: 1.10 + The *args* attribute. + + .. automethod:: wait + .. automethod:: connect + """ + + def __init__(self, timeout=5000, raising=True, check_params_cb=None): + super().__init__(timeout, raising=raising) + self._signals = [] + self.args = None + self.all_args = [] + self.check_params_callback = check_params_cb + self.signal_name = "" + + def connect(self, signal): + """ + Connects to the given signal, making :meth:`wait()` return once + this signal is emitted. + + More than one signal can be connected, in which case **any** one of + them will make ``wait()`` return. + + :param signal: QtCore.Signal or tuple (QtCore.Signal, str) + """ + self.signal_name = self.determine_signal_name(potential_signal_tuple=signal) + actual_signal = self.get_signal_from_potential_signal_tuple(signal) + actual_signal.connect(self._quit_loop_by_signal) + self._signals.append(actual_signal) + + @qt_api.Slot() + def _quit_loop_by_signal(self, *args): + """ + quits the event loop and marks that we finished because of a signal. + """ + if self.check_params_callback: + self.all_args.append(args) + if not self.check_params_callback(*args): + return # parameter check did not pass + try: + self.signal_triggered = True + self.args = list(args) + self._cleanup() + finally: + self._loop.quit() + + def _cleanup(self): + super()._cleanup() + # FIXME move to _AbstractSignalBlocker once we got MultiSignalBlocker correct + assert self._timer.thread() == qt_api.QtCore.QThread.currentThread() + for signal in self._signals: + _silent_disconnect(signal, self._quit_loop_by_signal) + self._signals = [] + + def get_params_as_str(self): + if not self.all_args: + return "" + + if len(self.all_args[0]) == 1: + # we have a list of tuples with 1 element each (i.e. the signal has 1 parameter), it doesn't make sense + # to return something like "[(someParam,), (someParam,)]", it's just ugly. Instead return something like + # "[someParam, someParam]" + args_list = [arg[0] for arg in self.all_args] + else: + args_list = self.all_args + + return str(args_list) + + def _get_timeout_error_message(self): + if self.check_params_callback is not None: + return ( + "Signal {signal_name} emitted with parameters {params} " + "within {timeout} ms, but did not satisfy " + "the {cb_name} callback" + ).format( + signal_name=self.signal_name, + params=self.get_params_as_str(), + timeout=self.timeout, + cb_name=self.get_callback_name(self.check_params_callback), + ) + else: + return "Signal {signal_name} not emitted after {timeout} ms".format( + signal_name=self.signal_name, timeout=self.timeout + ) + + +class MultiSignalBlocker(_AbstractSignalBlocker): + """ + Returned by :meth:`pytestqt.qtbot.QtBot.waitSignals` method, blocks until + all signals connected to it are triggered or the timeout is reached. + + Variables identical to :class:`SignalBlocker`: + - ``timeout`` + - ``signal_triggered`` + - ``raising`` + + .. automethod:: wait + """ + + def __init__(self, timeout=5000, raising=True, check_params_cbs=None, order="none"): + super().__init__(timeout, raising=raising) + self._order = order + self._check_params_callbacks = check_params_cbs + self._signals_emitted = ( + [] + ) # list of booleans, indicates whether the signal was already emitted + self._signals_map = ( + {} + ) # maps from a unique Signal to a list of indices where to expect signal instance emits + self._signals = ( + [] + ) # list of all Signals (for compatibility with _AbstractSignalBlocker) + self._slots = [] # list of slot functions + self._signal_expected_index = 0 # only used when forcing order + self._strict_order_violated = False + self._actual_signal_and_args_at_violation = None + self._signal_names = ( + {} + ) # maps from the unique Signal to the name of the signal (as string) + self.all_signals_and_args = [] # list of SignalAndArgs instances + + def add_signals(self, signals): + """ + Adds the given signal to the list of signals which :meth:`wait()` waits + for. + + :param list signals: list of QtCore.Signal`s or tuples (QtCore.Signal, str) + """ + self._determine_unique_signals(signals) + self._create_signal_emitted_indices(signals) + self._connect_unique_signals() + + def _get_timeout_error_message(self): + if not self._are_signal_names_available(): + error_message = self._get_degenerate_error_message() + else: + error_message = self._get_expected_and_actual_signals_message() + if self._strict_order_violated: + error_message = self._get_order_violation_message() + error_message + + return error_message + + def _determine_unique_signals(self, signals): + # create a map that maps from a unique signal to a list of indices + # (positions) where this signal is expected (in case order matters) + signals_as_str = [ + str(self.get_signal_from_potential_signal_tuple(signal)) + for signal in signals + ] + # maps from a signal-string to one of the signal instances (the first one found) + signal_str_to_unique_signal = {} + for index, signal_str in enumerate(signals_as_str): + signal = self.get_signal_from_potential_signal_tuple(signals[index]) + potential_tuple = signals[index] + if signal_str not in signal_str_to_unique_signal: + unique_signal_tuple = potential_tuple + signal_str_to_unique_signal[signal_str] = signal + self._signals_map[signal] = [index] # create a new list + else: + # append to existing list + unique_signal = signal_str_to_unique_signal[signal_str] + self._signals_map[unique_signal].append(index) + unique_signal_tuple = signals[index] + + self._determine_and_save_signal_name(unique_signal_tuple) + + def _determine_and_save_signal_name(self, unique_signal_tuple): + signal_name = self.determine_signal_name(unique_signal_tuple) + if signal_name: # might be an empty string if no name could be determined + unique_signal = self.get_signal_from_potential_signal_tuple( + unique_signal_tuple + ) + self._signal_names[unique_signal] = signal_name + + def _create_signal_emitted_indices(self, signals): + for signal in signals: + self._signals_emitted.append(False) + + def _connect_unique_signals(self): + for unique_signal in self._signals_map: + slot = functools.partial(self._unique_signal_emitted, unique_signal) + self._slots.append(slot) + unique_signal.connect(slot) + self._signals.append(unique_signal) + + def _unique_signal_emitted(self, unique_signal, *args): + """ + Called when a given signal is emitted. + + If all expected signals have been emitted, quits the event loop and + marks that we finished because signals. + """ + self._record_emitted_signal_if_possible(unique_signal, *args) + + self._check_signal_match(unique_signal, *args) + + if self._all_signals_emitted(): + self.signal_triggered = True + try: + self._cleanup() + finally: + self._loop.quit() + + def _record_emitted_signal_if_possible(self, unique_signal, *args): + if self._are_signal_names_available(): + self.all_signals_and_args.append( + SignalAndArgs(signal_name=self._signal_names[unique_signal], args=args) + ) + + def _check_signal_match(self, unique_signal, *args): + if self._order == "none": + # perform the test for every matching index (stop after the first one that matches) + try: + successful_index = self._get_first_matching_index(unique_signal, *args) + self._signals_emitted[successful_index] = True + except NoMatchingIndexFoundError: # none found + pass + elif self._order == "simple": + if self._check_signal_matches_expected_index(unique_signal, *args): + self._signals_emitted[self._signal_expected_index] = True + self._signal_expected_index += 1 + else: # self.order == "strict" + if not self._strict_order_violated: + # only do the check if the strict order has not been violated yet + self._strict_order_violated = ( + True # assume the order has been violated this time + ) + if self._check_signal_matches_expected_index(unique_signal, *args): + self._signals_emitted[self._signal_expected_index] = True + self._signal_expected_index += 1 + self._strict_order_violated = ( + False # order has not been violated after all! + ) + else: + if self._are_signal_names_available(): + self._actual_signal_and_args_at_violation = SignalAndArgs( + signal_name=self._signal_names[unique_signal], args=args + ) + + def _all_signals_emitted(self): + return not self._strict_order_violated and all(self._signals_emitted) + + def _get_first_matching_index(self, unique_signal, *args): + successfully_emitted = False + successful_index = -1 + potential_indices = self._get_unemitted_signal_indices(unique_signal) + for potential_index in potential_indices: + if not self._violates_callback_at_index(potential_index, *args): + successful_index = potential_index + successfully_emitted = True + break + if not successfully_emitted: + raise NoMatchingIndexFoundError + + return successful_index + + def _check_signal_matches_expected_index(self, unique_signal, *args): + potential_indices = self._get_unemitted_signal_indices(unique_signal) + if potential_indices: + if self._signal_expected_index == potential_indices[0]: + if not self._violates_callback_at_index( + self._signal_expected_index, *args + ): + return True + return False + + def _violates_callback_at_index(self, index, *args): + """ + Checks if there's a callback at the provided index that is violates due to invalid parameters. Returns False if + there is no callback for that index, or if a callback exists but it wasn't violated (returned True). + Returns True otherwise. + """ + if self._check_params_callbacks: + callback_func = self._check_params_callbacks[index] + if callback_func: + if not callback_func(*args): + return True + return False + + def _get_unemitted_signal_indices(self, signal): + """Returns the indices for the provided signal for which NO signal instance has been emitted yet.""" + return [ + index + for index in self._signals_map[signal] + if not self._signals_emitted[index] + ] + + def _are_signal_names_available(self): + if self._signal_names: + return True + return False + + def _get_degenerate_error_message(self): + received_signals = sum(self._signals_emitted) + total_signals = len(self._signals_emitted) + return ( + "Received {actual} of the {total} expected signals. " + "To improve this error message, provide the names of the signals " + "in the waitSignals() call." + ).format(actual=received_signals, total=total_signals) + + def _get_expected_and_actual_signals_message(self): + if not self.all_signals_and_args: + emitted_signals = "None" + else: + emitted_signal_string_list = [str(_) for _ in self.all_signals_and_args] + emitted_signals = self._format_as_array(emitted_signal_string_list) + + missing_signal_strings = [] + for missing_signal_index in self._get_missing_signal_indices(): + missing_signal_strings.append( + self._get_signal_string_representation_for_index(missing_signal_index) + ) + missing_signals = self._format_as_array(missing_signal_strings) + + return "Emitted signals: {}. Missing: {}".format( + emitted_signals, missing_signals + ) + + @staticmethod + def _format_as_array(list_of_strings): + return "[{}]".format(", ".join(list_of_strings)) + + def _get_order_violation_message(self): + expected_signal_as_str = self._get_signal_string_representation_for_index( + self._signal_expected_index + ) + actual_signal_as_str = str(self._actual_signal_and_args_at_violation) + return ( + "Signal order violated! Expected {expected} as {ordinal} signal, " + "but received {actual} instead. " + ).format( + expected=expected_signal_as_str, + ordinal=get_ordinal_str(self._signal_expected_index + 1), + actual=actual_signal_as_str, + ) + + def _get_missing_signal_indices(self): + return [ + index + for index, value in enumerate(self._signals_emitted) + if not self._signals_emitted[index] + ] + + def _get_signal_string_representation_for_index(self, index): + """Returns something like or (callback: )""" + signal = self._get_signal_for_index(index) + signal_str_repr = self._signal_names[signal] + + if self._check_params_callbacks: + potential_callback = self._check_params_callbacks[index] + if potential_callback: + callback_name = self.get_callback_name(potential_callback) + if callback_name: + signal_str_repr += f" (callback: {callback_name})" + + return signal_str_repr + + def _get_signal_for_index(self, index): + for signal in self._signals_map: + if index in self._signals_map[signal]: + return signal + + def _cleanup(self): + super()._cleanup() + for i in range(len(self._signals)): + signal = self._signals[i] + slot = self._slots[i] + _silent_disconnect(signal, slot) + del self._signals_emitted[:] + self._signals_map.clear() + del self._slots[:] + + +class CallbackBlocker(qt_api.QtCore.QObject): + """ + .. versionadded:: 3.1 + + An object which checks if the returned callback gets called. + + Intended to be used as a context manager. + + :ivar int timeout: maximum time to wait for the callback to be called. + + :ivar bool raising: + If :class:`qtbot.TimeoutError ` should be raised if + a timeout occurred. + + .. note:: contrary to the parameter of same name in + :meth:`pytestqt.qtbot.QtBot.waitCallback`, this parameter does not + consider the :ref:`qt_default_raising` option. + + :ivar list args: + The arguments with which the callback was called, or None if the + callback wasn't called at all. + + :ivar dict kwargs: + The keyword arguments with which the callback was called, or None if + the callback wasn't called at all. + """ + + def __init__(self, timeout=5000, raising=True): + super().__init__() + self.timeout = timeout + self.raising = raising + self.args = None + self.kwargs = None + self.called = False + self._loop = qt_api.QtCore.QEventLoop() + + self._timer = qt_api.QtCore.QTimer(self) + self._timer.setSingleShot(True) + if timeout is not None: + self._timer.setInterval(timeout) + self._timer.timeout.connect(self._quit_loop_by_timeout) + + def wait(self): + """ + Waits until either the returned callback is called or timeout is + reached. + """ + __tracebackhide__ = True + if self.called: + return + if self.timeout is not None: + self._timer.start() + qt_api.exec(self._loop) + if not self.called and self.raising: + raise TimeoutError("Callback wasn't called after %sms." % self.timeout) + + def assert_called_with(self, *args, **kwargs): + """ + Check that the callback was called with the same arguments as this + function. + """ + assert self.called + assert self.args == list(args) + assert self.kwargs == kwargs + + @qt_api.Slot() + def _quit_loop_by_timeout(self): + try: + self._cleanup() + finally: + self._loop.quit() + + def _cleanup(self): + assert self._timer.thread() == qt_api.QtCore.QThread.currentThread() + self._timer.stop() + + def __call__(self, *args, **kwargs): + # Not inside the try: block, as if self.called is True, we did quit the + # loop already. + if self.called: + raise CallbackCalledTwiceError("Callback called twice") + try: + self.args = list(args) + self.kwargs = kwargs + self.called = True + self._cleanup() + finally: + self._loop.quit() + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + __tracebackhide__ = True + if value is None: + # only wait if no exception happened inside the "with" block + self.wait() diff --git a/tests/conftest.py b/tests/conftest.py index 6010c31..e09ea8b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,3 @@ -import functools import time import pytest @@ -63,10 +62,9 @@ def shutdown(self): def single_shot(self, signal, delay): t = qt_api.QtCore.QTimer(self) t.setSingleShot(True) - slot = functools.partial(self._emit, signal) - t.timeout.connect(slot) + t.timeout.connect(signal) t.start(delay) - self.timers_and_slots.append((t, slot)) + self.timers_and_slots.append((t, signal)) def single_shot_callback(self, callback, delay): t = qt_api.QtCore.QTimer(self) @@ -75,9 +73,6 @@ def single_shot_callback(self, callback, delay): t.start(delay) self.timers_and_slots.append((t, callback)) - def _emit(self, signal): - signal.emit() - timer = Timer() yield timer timer.shutdown() diff --git a/tests/test_wait_signal.py b/tests/test_wait_signal.py index 87ca324..3295db3 100644 --- a/tests/test_wait_signal.py +++ b/tests/test_wait_signal.py @@ -5,10 +5,10 @@ import sys from pytestqt.qt_compat import qt_api -from pytestqt.wait_signal import ( +from pytestqt.utils import SignalAndArgs +from pytestqt.exceptions import ( SignalEmittedError, TimeoutError, - SignalAndArgs, CallbackCalledTwiceError, ) @@ -1364,3 +1364,134 @@ def test_timeout_not_raising(self, qtbot): assert not callback.called assert callback.args is None assert callback.kwargs is None + + +@pytest.mark.parametrize( + "check_warnings, count", + [ + # Checking for warnings + pytest.param( + True, # check warnings + 200, # gets output reliably even with only few runs (often the first) + id="stderr", + ), + # Triggering AttributeError + pytest.param( + False, # don't check warnings + # Hopefully enough to trigger the AttributeError race condition reliably. + # With 500 runs, only 1 of 5 Windows PySide6 CI jobs triggered it (but all + # Ubuntu/macOS jobs did). With 1500 runs, Windows jobs still only triggered + # it 0-2 times. + # + # On my machine (Linux, Intel Core Ultra 9 185H), 500 runs trigger it + # reliably and take ~1s in total. + 2500 if sys.platform == "win32" else 500, + id="attributeerror", + ), + ], +) +@pytest.mark.parametrize("multi_blocker", [True, False]) +def test_signal_raised_from_thread( + monkeypatch: pytest.MonkeyPatch, + pytester: pytest.Pytester, + check_warnings: bool, + multi_blocker: bool, + count: int, +) -> None: + """Wait for a signal with a thread. + + Extracted from https://github.com/pytest-dev/pytest-qt/issues/586 + """ + pytester.makepyfile( + f""" + import pytest + from pytestqt.qt_compat import qt_api + + + class Worker(qt_api.QtCore.QObject): + signal = qt_api.Signal() + + + @pytest.mark.parametrize("_", range({count})) + def test_thread(qtbot, _): + worker = Worker() + thread = qt_api.QtCore.QThread() + worker.moveToThread(thread) + thread.start() + + try: + if {multi_blocker}: # multi_blocker + with qtbot.waitSignals([worker.signal], timeout=500) as blocker: + worker.signal.emit() + else: + with qtbot.waitSignal(worker.signal, timeout=500) as blocker: + worker.signal.emit() + finally: + thread.quit() + thread.wait() + """ + ) + if check_warnings: + monkeypatch.setenv("QT_FATAL_WARNINGS", "1") + res = pytester.runpytest_subprocess("-x", "-s") + + qtimer_message = "QObject::killTimer: Timers cannot be stopped from another thread" + if ( + qtimer_message in res.stderr.str() + and multi_blocker + and check_warnings + and qt_api.pytest_qt_api == "pyside6" + ): + # We haven't fixed MultiSignalBlocker yet... + pytest.xfail(f"Qt error: {qtimer_message}") + + outcomes = res.parseoutcomes() + res.assert_outcomes(passed=outcomes["passed"]) # no failed/error + + +@pytest.mark.skip(reason="Runs ~1min to reproduce bug reliably") +def test_callback_in_thread(pytester: pytest.Pytester) -> None: + """Wait for a callback with a thread. + + Inspired by https://github.com/pytest-dev/pytest-qt/issues/586 + """ + # Hopefully enough to trigger the bug reliably. + # + # On my machine (Linux, Intel Core Ultra 9 185H), sometimes the bug only + # triggers after ~30k runs (~44s). Thus, we skip this test by default. + count = 50_000 + + pytester.makepyfile( + f""" + import pytest + from pytestqt.qt_compat import qt_api + + + class Worker(qt_api.QtCore.QObject): + def __init__(self, callback): + super().__init__() + self.callback = callback + + def call_callback(self): + self.callback() + + + @pytest.mark.parametrize("_", range({count})) + def test_thread(qtbot, _): + thread = qt_api.QtCore.QThread() + + try: + with qtbot.waitCallback() as callback: + worker = Worker(callback) + worker.moveToThread(thread) + thread.started.connect(worker.call_callback) + thread.start() + finally: + thread.quit() + thread.wait() + """ + ) + + res = pytester.runpytest_subprocess("-x") + outcomes = res.parseoutcomes() + res.assert_outcomes(passed=outcomes["passed"]) # no failed/error