diff --git a/CHANGES.md b/CHANGES.md index 24d2a0d..45e4caf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,7 +5,12 @@ chronological order. Releases follow [semantic versioning](https://semver.org/) releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and [Anaconda.org](https://anaconda.org/conda-forge/pytask-parallel). -## 0.2.0 - 2022-xx-xx +## 0.2.1 - 2022-08-xx + +- {pull}`43` adds docformatter. +- {pull}`44` allows to capture warnings from subprocesses. Fixes {issue}`41`. + +## 0.2.0 - 2022-04-15 - {pull}`31` adds types to the package. - {pull}`36` adds a test for . diff --git a/README.md b/README.md index bed9480..78159d5 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,9 @@ n_workers = 1 parallel_backend = "loky" # or processes or threads ``` -## Warning +## Some implementation details + +### Parallelization and Debugging It is not possible to combine parallelization with debugging. That is why `--pdb` or `--trace` deactivate parallelization. @@ -76,6 +78,11 @@ It is not possible to combine parallelization with debugging. That is why `--pdb If you parallelize the execution of your tasks using two or more workers, do not use `breakpoint()` or `import pdb; pdb.set_trace()` since both will cause exceptions. +### Threads and warnings + +Capturing warnings is not thread-safe. Therefore, warnings cannot be captured reliably +when tasks are parallelized with `--parallel-backend threads`. + ## Changes Consult the [release notes](CHANGES.md) to find out about what is new. diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 2421f9f..b9a750e 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -4,15 +4,19 @@ import inspect import sys import time +import warnings from concurrent.futures import Future from types import TracebackType from typing import Any +from typing import Callable import cloudpickle from pybaum.tree_util import tree_map from pytask import console from pytask import ExecutionReport +from pytask import get_marks from pytask import hookimpl +from pytask import Mark from pytask import remove_internal_traceback_frames_from_exc_info from pytask import Session from pytask import Task @@ -20,6 +24,16 @@ from rich.console import ConsoleOptions from rich.traceback import Traceback +# Can be removed if pinned to pytask >= 0.2.6. +try: + from pytask import parse_warning_filter + from pytask import warning_record_to_str + from pytask import WarningReport +except ImportError: + from _pytask.warnings import parse_warning_filter + from _pytask.warnings import warning_record_to_str + from _pytask.warnings_utils import WarningReport + @hookimpl def pytask_post_parse(config: dict[str, Any]) -> None: @@ -85,42 +99,38 @@ def pytask_execute_build(session: Session) -> bool | None: for task_name in list(running_tasks): future = running_tasks[task_name] - if future.done() and ( - future.exception() is not None - or future.result() is not None - ): - task = session.dag.nodes[task_name]["task"] - if future.exception() is not None: - exception = future.exception() - exc_info = ( - type(exception), - exception, - exception.__traceback__, - ) - else: - exc_info = future.result() - - newly_collected_reports.append( - ExecutionReport.from_task_and_exception(task, exc_info) + if future.done(): + warning_reports, task_exception = future.result() + session.warnings.extend(warning_reports) + exc_info = ( + _parse_future_exception(future.exception()) + or task_exception ) - running_tasks.pop(task_name) - session.scheduler.done(task_name) - elif future.done() and future.exception() is None: - task = session.dag.nodes[task_name]["task"] - try: - session.hook.pytask_execute_task_teardown( - session=session, task=task - ) - except Exception: - report = ExecutionReport.from_task_and_exception( - task, sys.exc_info() + if exc_info is not None: + task = session.dag.nodes[task_name]["task"] + newly_collected_reports.append( + ExecutionReport.from_task_and_exception( + task, exc_info + ) ) + running_tasks.pop(task_name) + session.scheduler.done(task_name) else: - report = ExecutionReport.from_task(task) - - running_tasks.pop(task_name) - newly_collected_reports.append(report) - session.scheduler.done(task_name) + task = session.dag.nodes[task_name]["task"] + try: + session.hook.pytask_execute_task_teardown( + session=session, task=task + ) + except Exception: + report = ExecutionReport.from_task_and_exception( + task, sys.exc_info() + ) + else: + report = ExecutionReport.from_task(task) + + running_tasks.pop(task_name) + newly_collected_reports.append(report) + session.scheduler.done(task_name) else: pass @@ -144,6 +154,17 @@ def pytask_execute_build(session: Session) -> bool | None: return None +def _parse_future_exception( + exception: BaseException | None, +) -> tuple[type[BaseException], BaseException, TracebackType] | None: + """Parse a future exception.""" + return ( + None + if exception is None + else (type(exception), exception, exception.__traceback__) + ) + + class ProcessesNameSpace: """The name space for hooks related to processes.""" @@ -167,6 +188,9 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None: bytes_kwargs=bytes_kwargs, show_locals=session.config["show_locals"], console_options=console.options, + session_filterwarnings=session.config["filterwarnings"], + task_filterwarnings=get_marks(task, "filterwarnings"), + task_short_name=task.short_name, ) return None @@ -176,7 +200,10 @@ def _unserialize_and_execute_task( bytes_kwargs: bytes, show_locals: bool, console_options: ConsoleOptions, -) -> tuple[type[BaseException], BaseException, str] | None: + session_filterwarnings: tuple[str, ...], + task_filterwarnings: tuple[Mark, ...], + task_short_name: str, +) -> tuple[list[WarningReport], tuple[type[BaseException], BaseException, str] | None]: """Unserialize and execute task. This function receives bytes and unpickles them to a task which is them execute in a @@ -188,13 +215,40 @@ def _unserialize_and_execute_task( task = cloudpickle.loads(bytes_function) kwargs = cloudpickle.loads(bytes_kwargs) - try: - task.execute(**kwargs) - except Exception: - exc_info = sys.exc_info() - processed_exc_info = _process_exception(exc_info, show_locals, console_options) - return processed_exc_info - return None + with warnings.catch_warnings(record=True) as log: + # mypy can't infer that record=True means log is not None; help it. + assert log is not None + + for arg in session_filterwarnings: + warnings.filterwarnings(*parse_warning_filter(arg, escape=False)) + + # apply filters from "filterwarnings" marks + for mark in task_filterwarnings: + for arg in mark.args: + warnings.filterwarnings(*parse_warning_filter(arg, escape=False)) + + try: + task.execute(**kwargs) + except Exception: + exc_info = sys.exc_info() + processed_exc_info = _process_exception( + exc_info, show_locals, console_options + ) + else: + processed_exc_info = None + + warning_reports = [] + for warning_message in log: + fs_location = warning_message.filename, warning_message.lineno + warning_reports.append( + WarningReport( + message=warning_record_to_str(warning_message), + fs_location=fs_location, + id_=task_short_name, + ) + ) + + return warning_reports, processed_exc_info def _process_exception( @@ -224,11 +278,33 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None: """ if session.config["n_workers"] > 1: kwargs = _create_kwargs_for_task(task) - return session.executor.submit(task.execute, **kwargs) + return session.executor.submit( + _mock_processes_for_threads, func=task.execute, **kwargs + ) else: return None +def _mock_processes_for_threads( + func: Callable[..., Any], **kwargs: Any +) -> tuple[list[Any], tuple[type[BaseException], BaseException, TracebackType] | None]: + """Mock execution function such that it returns the same as for processes. + + The function for processes returns ``warning_reports`` and an ``exception``. With + threads, these object are collected by the main and not the subprocess. So, we just + return placeholders. + + """ + __tracebackhide__ = True + try: + func(**kwargs) + except Exception: + exc_info = sys.exc_info() + else: + exc_info = None + return [], exc_info + + def _create_kwargs_for_task(task: Task) -> dict[Any, Any]: """Create kwargs for task function.""" kwargs = {**task.kwargs} diff --git a/tests/test_execute.py b/tests/test_execute.py index 8f8f65f..495d849 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -119,6 +119,7 @@ def myfunc(): "n_workers": 2, "parallel_backend": parallel_backend, "show_locals": False, + "filterwarnings": [], } with PARALLEL_BACKENDS[parallel_backend]( @@ -135,7 +136,9 @@ def myfunc(): future = backend_name_space.pytask_execute_task(session, task) executor.shutdown() - assert future.result() is None + warning_reports, exception = future.result() + assert warning_reports == [] + assert exception is None @pytest.mark.end_to_end @@ -288,3 +291,37 @@ def task_example(produces): ) assert session.exit_code == ExitCode.OK + + +@pytest.mark.end_to_end +@pytest.mark.parametrize( + "parallel_backend", + # Capturing warnings is not thread-safe. + [backend for backend in PARALLEL_BACKENDS if backend != "threads"], +) +def test_collect_warnings_from_parallelized_tasks(runner, tmp_path, parallel_backend): + source = """ + import pytask + import warnings + + for i in range(2): + + @pytask.mark.task(id=i, kwargs={"produces": f"{i}.txt"}) + def task_example(produces): + warnings.warn("This is a warning.") + produces.touch() + """ + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + + result = runner.invoke( + cli, [tmp_path.as_posix(), "-n", "2", "--parallel-backend", parallel_backend] + ) + + assert result.exit_code == ExitCode.OK + assert "Warnings" in result.output + assert "This is a warning." in result.output + assert "capture_warnings.html" in result.output + + warnings_block = result.output.split("Warnings")[1] + assert "task_example.py::task_example[0]" in warnings_block + assert "task_example.py::task_example[1]" in warnings_block diff --git a/tox.ini b/tox.ini index aea1de4..d0e8914 100644 --- a/tox.ini +++ b/tox.ini @@ -39,6 +39,9 @@ warn-symbols = pytest.mark.skip = Remove 'skip' flag for tests. [pytest] +testpaths = + # Do not add src since it messes with the loading of pytask-parallel as a plugin. + tests addopts = --doctest-modules filterwarnings = ignore: the imp module is deprecated in favour of importlib