From 76463bd55285f20475afba11a7a784cb279dc376 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Thu, 11 Aug 2022 17:45:02 +0200 Subject: [PATCH 1/7] Implement failing test. --- tests/test_execute.py | 22 ++++++++++++++++++++++ tox.ini | 2 ++ 2 files changed, 24 insertions(+) diff --git a/tests/test_execute.py b/tests/test_execute.py index 8f8f65f..3207859 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -288,3 +288,25 @@ def task_example(produces): ) assert session.exit_code == ExitCode.OK + + +@pytest.mark.end_to_end +@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS) +def test_collect_warnings_from_parallelized_tasks(runner, tmp_path, parallel_backend): + source = """ + import pytask + import warnings + + for i in range(2): + + @pytask.mark.task(id=i, kwargs={"produces": f"{i}.txt"}) + def task_example(produces): + warnings.warn("This is a warning.") + produces.touch() + """ + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + + result = runner.invoke(cli, [tmp_path.as_posix(), "-n", "2", "--parallel-backend", parallel_backend]) + + assert result.exit_code == ExitCode.OK + assert "Warnings" in result.output diff --git a/tox.ini b/tox.ini index aea1de4..9b37128 100644 --- a/tox.ini +++ b/tox.ini @@ -39,6 +39,8 @@ warn-symbols = pytest.mark.skip = Remove 'skip' flag for tests. [pytest] +testpaths = + tests addopts = --doctest-modules filterwarnings = ignore: the imp module is deprecated in favour of importlib From 5a339b23f32528350ca0b19dec87dd593f8f23c7 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Thu, 11 Aug 2022 17:47:52 +0200 Subject: [PATCH 2/7] Fix pre-commit. --- tests/test_execute.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_execute.py b/tests/test_execute.py index 3207859..af82e06 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -306,7 +306,9 @@ def task_example(produces): """ tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) - result = runner.invoke(cli, [tmp_path.as_posix(), "-n", "2", "--parallel-backend", parallel_backend]) + result = runner.invoke( + cli, [tmp_path.as_posix(), "-n", "2", "--parallel-backend", parallel_backend] + ) assert result.exit_code == ExitCode.OK assert "Warnings" in result.output From 7c8ddd8dcefeee3895513be5473042df4b31a9ca Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Fri, 12 Aug 2022 12:56:50 +0200 Subject: [PATCH 3/7] Pass through filters and make everything work with threads. --- src/pytask_parallel/execute.py | 148 +++++++++++++++++++++++---------- tests/test_execute.py | 5 +- tox.ini | 1 + 3 files changed, 109 insertions(+), 45 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 2421f9f..5ffca4b 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -4,15 +4,22 @@ import inspect import sys import time +import warnings from concurrent.futures import Future from types import TracebackType from typing import Any +from typing import Callable import cloudpickle +from _pytask.warnings import parse_warning_filter +from _pytask.warnings import warning_record_to_str +from _pytask.warnings_utils import WarningReport from pybaum.tree_util import tree_map from pytask import console from pytask import ExecutionReport +from pytask import get_marks from pytask import hookimpl +from pytask import Mark from pytask import remove_internal_traceback_frames_from_exc_info from pytask import Session from pytask import Task @@ -85,42 +92,45 @@ def pytask_execute_build(session: Session) -> bool | None: for task_name in list(running_tasks): future = running_tasks[task_name] - if future.done() and ( - future.exception() is not None - or future.result() is not None - ): - task = session.dag.nodes[task_name]["task"] - if future.exception() is not None: - exception = future.exception() - exc_info = ( - type(exception), - exception, - exception.__traceback__, + if future.done(): + if ( + future.exception() is not None + or future.result()[1] is not None + ): + task = session.dag.nodes[task_name]["task"] + if future.exception() is not None: + exception = future.exception() + exc_info = ( + type(exception), + exception, + exception.__traceback__, + ) + else: + exc_info = future.result()[1] + + newly_collected_reports.append( + ExecutionReport.from_task_and_exception( + task, exc_info + ) ) - else: - exc_info = future.result() - - newly_collected_reports.append( - ExecutionReport.from_task_and_exception(task, exc_info) - ) - running_tasks.pop(task_name) - session.scheduler.done(task_name) - elif future.done() and future.exception() is None: - task = session.dag.nodes[task_name]["task"] - try: - session.hook.pytask_execute_task_teardown( - session=session, task=task - ) - except Exception: - report = ExecutionReport.from_task_and_exception( - task, sys.exc_info() - ) - else: - report = ExecutionReport.from_task(task) - - running_tasks.pop(task_name) - newly_collected_reports.append(report) - session.scheduler.done(task_name) + running_tasks.pop(task_name) + session.scheduler.done(task_name) + elif future.exception() is None: + task = session.dag.nodes[task_name]["task"] + try: + session.hook.pytask_execute_task_teardown( + session=session, task=task + ) + except Exception: + report = ExecutionReport.from_task_and_exception( + task, sys.exc_info() + ) + else: + report = ExecutionReport.from_task(task) + + running_tasks.pop(task_name) + newly_collected_reports.append(report) + session.scheduler.done(task_name) else: pass @@ -167,6 +177,9 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None: bytes_kwargs=bytes_kwargs, show_locals=session.config["show_locals"], console_options=console.options, + session_filterwarnings=session.config["filterwarnings"], + task_filterwarnings=get_marks(task, "filterwarnings"), + task_short_name=task.short_name, ) return None @@ -176,7 +189,10 @@ def _unserialize_and_execute_task( bytes_kwargs: bytes, show_locals: bool, console_options: ConsoleOptions, -) -> tuple[type[BaseException], BaseException, str] | None: + session_filterwarnings: tuple[str, ...], + task_filterwarnings: tuple[Mark, ...], + task_short_name: str, +) -> tuple[list[WarningReport], tuple[type[BaseException], BaseException, str] | None]: """Unserialize and execute task. This function receives bytes and unpickles them to a task which is them execute in a @@ -188,13 +204,40 @@ def _unserialize_and_execute_task( task = cloudpickle.loads(bytes_function) kwargs = cloudpickle.loads(bytes_kwargs) - try: - task.execute(**kwargs) - except Exception: - exc_info = sys.exc_info() - processed_exc_info = _process_exception(exc_info, show_locals, console_options) - return processed_exc_info - return None + with warnings.catch_warnings(record=True) as log: + # mypy can't infer that record=True means log is not None; help it. + assert log is not None + + for arg in session_filterwarnings: + warnings.filterwarnings(*parse_warning_filter(arg, escape=False)) + + # apply filters from "filterwarnings" marks + for mark in task_filterwarnings: + for arg in mark.args: + warnings.filterwarnings(*parse_warning_filter(arg, escape=False)) + + try: + task.execute(**kwargs) + except Exception: + exc_info = sys.exc_info() + processed_exc_info = _process_exception( + exc_info, show_locals, console_options + ) + else: + processed_exc_info = None + + warning_reports = [] + for warning_message in log: + fs_location = warning_message.filename, warning_message.lineno + warning_reports.append( + WarningReport( + message=warning_record_to_str(warning_message), + fs_location=fs_location, + id_=task_short_name, + ) + ) + + return warning_reports, processed_exc_info def _process_exception( @@ -224,11 +267,28 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None: """ if session.config["n_workers"] > 1: kwargs = _create_kwargs_for_task(task) - return session.executor.submit(task.execute, **kwargs) + return session.executor.submit( + _mock_processes_for_threads, func=task.execute, **kwargs + ) else: return None +def _mock_processes_for_threads( + func: Callable[..., Any], **kwargs: Any +) -> tuple[list[Any], None]: + """Mock execution function such that it returns the same as for processes. + + The function for processes returns ``warning_reports`` and an ``exception``. With + threads, these object are collected by the main and not the subprocess. So, we just + return placeholders. + + """ + __tracebackhide__ = True + func(**kwargs) + return [], None + + def _create_kwargs_for_task(task: Task) -> dict[Any, Any]: """Create kwargs for task function.""" kwargs = {**task.kwargs} diff --git a/tests/test_execute.py b/tests/test_execute.py index af82e06..bc16dcf 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -119,6 +119,7 @@ def myfunc(): "n_workers": 2, "parallel_backend": parallel_backend, "show_locals": False, + "filterwarnings": [], } with PARALLEL_BACKENDS[parallel_backend]( @@ -135,7 +136,9 @@ def myfunc(): future = backend_name_space.pytask_execute_task(session, task) executor.shutdown() - assert future.result() is None + warning_reports, exception = future.result() + assert warning_reports == [] + assert exception is None @pytest.mark.end_to_end diff --git a/tox.ini b/tox.ini index 9b37128..d0e8914 100644 --- a/tox.ini +++ b/tox.ini @@ -40,6 +40,7 @@ warn-symbols = [pytest] testpaths = + # Do not add src since it messes with the loading of pytask-parallel as a plugin. tests addopts = --doctest-modules filterwarnings = From 785101e55fe08deb087ead1e4a7fd15719ca31ea Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Fri, 12 Aug 2022 15:10:16 +0200 Subject: [PATCH 4/7] Fix tests for processes. Warnings in threads are not always captured. --- src/pytask_parallel/execute.py | 34 +++++++++++++++++++--------------- tests/test_execute.py | 6 ++++++ 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 5ffca4b..aa7dcc5 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -93,21 +93,14 @@ def pytask_execute_build(session: Session) -> bool | None: for task_name in list(running_tasks): future = running_tasks[task_name] if future.done(): - if ( - future.exception() is not None - or future.result()[1] is not None - ): + warning_reports, task_exception = future.result() + session.warnings.extend(warning_reports) + exc_info = ( + _parse_future_exception(future.exception()) + or task_exception + ) + if exc_info is not None: task = session.dag.nodes[task_name]["task"] - if future.exception() is not None: - exception = future.exception() - exc_info = ( - type(exception), - exception, - exception.__traceback__, - ) - else: - exc_info = future.result()[1] - newly_collected_reports.append( ExecutionReport.from_task_and_exception( task, exc_info @@ -115,7 +108,7 @@ def pytask_execute_build(session: Session) -> bool | None: ) running_tasks.pop(task_name) session.scheduler.done(task_name) - elif future.exception() is None: + else: task = session.dag.nodes[task_name]["task"] try: session.hook.pytask_execute_task_teardown( @@ -154,6 +147,17 @@ def pytask_execute_build(session: Session) -> bool | None: return None +def _parse_future_exception( + exception: BaseException | None, +) -> tuple[type[BaseException], BaseException, TracebackType] | None: + """Parse a future exception.""" + return ( + None + if exception is None + else (type(exception), exception, exception.__traceback__) + ) + + class ProcessesNameSpace: """The name space for hooks related to processes.""" diff --git a/tests/test_execute.py b/tests/test_execute.py index bc16dcf..1c316fe 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -315,3 +315,9 @@ def task_example(produces): assert result.exit_code == ExitCode.OK assert "Warnings" in result.output + assert "This is a warning." in result.output + assert "capture_warnings.html" in result.output + + warnings_block = result.output.split("Warnings")[1] + assert "task_example.py::task_example[0]" in warnings_block + assert "task_example.py::task_example[1]" in warnings_block From 8cd6f67d2efe3232305be6611f580e888c766605 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 13 Aug 2022 19:28:48 +0200 Subject: [PATCH 5/7] Fix tests. --- CHANGES.md | 7 ++++++- src/pytask_parallel/execute.py | 11 ++++++++--- tests/test_execute.py | 6 +++++- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 24d2a0d..45e4caf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,7 +5,12 @@ chronological order. Releases follow [semantic versioning](https://semver.org/) releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and [Anaconda.org](https://anaconda.org/conda-forge/pytask-parallel). -## 0.2.0 - 2022-xx-xx +## 0.2.1 - 2022-08-xx + +- {pull}`43` adds docformatter. +- {pull}`44` allows to capture warnings from subprocesses. Fixes {issue}`41`. + +## 0.2.0 - 2022-04-15 - {pull}`31` adds types to the package. - {pull}`36` adds a test for . diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index aa7dcc5..5e1aac7 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -280,7 +280,7 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None: def _mock_processes_for_threads( func: Callable[..., Any], **kwargs: Any -) -> tuple[list[Any], None]: +) -> tuple[list[Any], tuple[type[BaseException], BaseException, TracebackType] | None]: """Mock execution function such that it returns the same as for processes. The function for processes returns ``warning_reports`` and an ``exception``. With @@ -289,8 +289,13 @@ def _mock_processes_for_threads( """ __tracebackhide__ = True - func(**kwargs) - return [], None + try: + func(**kwargs) + except Exception: + exc_info = sys.exc_info() + else: + exc_info = None + return [], exc_info def _create_kwargs_for_task(task: Task) -> dict[Any, Any]: diff --git a/tests/test_execute.py b/tests/test_execute.py index 1c316fe..495d849 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -294,7 +294,11 @@ def task_example(produces): @pytest.mark.end_to_end -@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS) +@pytest.mark.parametrize( + "parallel_backend", + # Capturing warnings is not thread-safe. + [backend for backend in PARALLEL_BACKENDS if backend != "threads"], +) def test_collect_warnings_from_parallelized_tasks(runner, tmp_path, parallel_backend): source = """ import pytask From 26275723a7017baf528c523e96f1b170b2f22e0b Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 13 Aug 2022 19:32:51 +0200 Subject: [PATCH 6/7] Update readme. --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index bed9480..78159d5 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,9 @@ n_workers = 1 parallel_backend = "loky" # or processes or threads ``` -## Warning +## Some implementation details + +### Parallelization and Debugging It is not possible to combine parallelization with debugging. That is why `--pdb` or `--trace` deactivate parallelization. @@ -76,6 +78,11 @@ It is not possible to combine parallelization with debugging. That is why `--pdb If you parallelize the execution of your tasks using two or more workers, do not use `breakpoint()` or `import pdb; pdb.set_trace()` since both will cause exceptions. +### Threads and warnings + +Capturing warnings is not thread-safe. Therefore, warnings cannot be captured reliably +when tasks are parallelized with `--parallel-backend threads`. + ## Changes Consult the [release notes](CHANGES.md) to find out about what is new. From 2cddd0ed51fe8549eeb35ca1bb617a898b647cee Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 13 Aug 2022 19:59:53 +0200 Subject: [PATCH 7/7] Make imports flexible. --- src/pytask_parallel/execute.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 5e1aac7..b9a750e 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -11,9 +11,6 @@ from typing import Callable import cloudpickle -from _pytask.warnings import parse_warning_filter -from _pytask.warnings import warning_record_to_str -from _pytask.warnings_utils import WarningReport from pybaum.tree_util import tree_map from pytask import console from pytask import ExecutionReport @@ -27,6 +24,16 @@ from rich.console import ConsoleOptions from rich.traceback import Traceback +# Can be removed if pinned to pytask >= 0.2.6. +try: + from pytask import parse_warning_filter + from pytask import warning_record_to_str + from pytask import WarningReport +except ImportError: + from _pytask.warnings import parse_warning_filter + from _pytask.warnings import warning_record_to_str + from _pytask.warnings_utils import WarningReport + @hookimpl def pytask_post_parse(config: dict[str, Any]) -> None: