Skip to content

Capture warnings. #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ chronological order. Releases follow [semantic versioning](https://semver.org/)
releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
[Anaconda.org](https://anaconda.org/conda-forge/pytask-parallel).

## 0.2.0 - 2022-xx-xx
## 0.2.1 - 2022-08-xx

- {pull}`43` adds docformatter.
- {pull}`44` allows to capture warnings from subprocesses. Fixes {issue}`41`.

## 0.2.0 - 2022-04-15

- {pull}`31` adds types to the package.
- {pull}`36` adds a test for <https://github.com/pytask-dev/pytask/issues/216>.
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,21 @@ n_workers = 1
parallel_backend = "loky" # or processes or threads
```

## Warning
## Some implementation details

### Parallelization and Debugging

It is not possible to combine parallelization with debugging. That is why `--pdb` or
`--trace` deactivate parallelization.

If you parallelize the execution of your tasks using two or more workers, do not use
`breakpoint()` or `import pdb; pdb.set_trace()` since both will cause exceptions.

### Threads and warnings

Capturing warnings is not thread-safe. Therefore, warnings cannot be captured reliably
when tasks are parallelized with `--parallel-backend threads`.

## Changes

Consult the [release notes](CHANGES.md) to find out about what is new.
Expand Down
160 changes: 118 additions & 42 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,36 @@
import inspect
import sys
import time
import warnings
from concurrent.futures import Future
from types import TracebackType
from typing import Any
from typing import Callable

import cloudpickle
from pybaum.tree_util import tree_map
from pytask import console
from pytask import ExecutionReport
from pytask import get_marks
from pytask import hookimpl
from pytask import Mark
from pytask import remove_internal_traceback_frames_from_exc_info
from pytask import Session
from pytask import Task
from pytask_parallel.backends import PARALLEL_BACKENDS
from rich.console import ConsoleOptions
from rich.traceback import Traceback

# Can be removed if pinned to pytask >= 0.2.6.
try:
from pytask import parse_warning_filter
from pytask import warning_record_to_str
from pytask import WarningReport
except ImportError:
from _pytask.warnings import parse_warning_filter
from _pytask.warnings import warning_record_to_str
from _pytask.warnings_utils import WarningReport


@hookimpl
def pytask_post_parse(config: dict[str, Any]) -> None:
Expand Down Expand Up @@ -85,42 +99,38 @@ def pytask_execute_build(session: Session) -> bool | None:

for task_name in list(running_tasks):
future = running_tasks[task_name]
if future.done() and (
future.exception() is not None
or future.result() is not None
):
task = session.dag.nodes[task_name]["task"]
if future.exception() is not None:
exception = future.exception()
exc_info = (
type(exception),
exception,
exception.__traceback__,
)
else:
exc_info = future.result()

newly_collected_reports.append(
ExecutionReport.from_task_and_exception(task, exc_info)
if future.done():
warning_reports, task_exception = future.result()
session.warnings.extend(warning_reports)
exc_info = (
_parse_future_exception(future.exception())
or task_exception
)
running_tasks.pop(task_name)
session.scheduler.done(task_name)
elif future.done() and future.exception() is None:
task = session.dag.nodes[task_name]["task"]
try:
session.hook.pytask_execute_task_teardown(
session=session, task=task
)
except Exception:
report = ExecutionReport.from_task_and_exception(
task, sys.exc_info()
if exc_info is not None:
task = session.dag.nodes[task_name]["task"]
newly_collected_reports.append(
ExecutionReport.from_task_and_exception(
task, exc_info
)
)
running_tasks.pop(task_name)
session.scheduler.done(task_name)
else:
report = ExecutionReport.from_task(task)

running_tasks.pop(task_name)
newly_collected_reports.append(report)
session.scheduler.done(task_name)
task = session.dag.nodes[task_name]["task"]
try:
session.hook.pytask_execute_task_teardown(
session=session, task=task
)
except Exception:
report = ExecutionReport.from_task_and_exception(
task, sys.exc_info()
)
else:
report = ExecutionReport.from_task(task)

running_tasks.pop(task_name)
newly_collected_reports.append(report)
session.scheduler.done(task_name)
else:
pass

Expand All @@ -144,6 +154,17 @@ def pytask_execute_build(session: Session) -> bool | None:
return None


def _parse_future_exception(
exception: BaseException | None,
) -> tuple[type[BaseException], BaseException, TracebackType] | None:
"""Parse a future exception."""
return (
None
if exception is None
else (type(exception), exception, exception.__traceback__)
)


class ProcessesNameSpace:
"""The name space for hooks related to processes."""

Expand All @@ -167,6 +188,9 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None:
bytes_kwargs=bytes_kwargs,
show_locals=session.config["show_locals"],
console_options=console.options,
session_filterwarnings=session.config["filterwarnings"],
task_filterwarnings=get_marks(task, "filterwarnings"),
task_short_name=task.short_name,
)
return None

Expand All @@ -176,7 +200,10 @@ def _unserialize_and_execute_task(
bytes_kwargs: bytes,
show_locals: bool,
console_options: ConsoleOptions,
) -> tuple[type[BaseException], BaseException, str] | None:
session_filterwarnings: tuple[str, ...],
task_filterwarnings: tuple[Mark, ...],
task_short_name: str,
) -> tuple[list[WarningReport], tuple[type[BaseException], BaseException, str] | None]:
"""Unserialize and execute task.

This function receives bytes and unpickles them to a task which is them execute in a
Expand All @@ -188,13 +215,40 @@ def _unserialize_and_execute_task(
task = cloudpickle.loads(bytes_function)
kwargs = cloudpickle.loads(bytes_kwargs)

try:
task.execute(**kwargs)
except Exception:
exc_info = sys.exc_info()
processed_exc_info = _process_exception(exc_info, show_locals, console_options)
return processed_exc_info
return None
with warnings.catch_warnings(record=True) as log:
# mypy can't infer that record=True means log is not None; help it.
assert log is not None

for arg in session_filterwarnings:
warnings.filterwarnings(*parse_warning_filter(arg, escape=False))

# apply filters from "filterwarnings" marks
for mark in task_filterwarnings:
for arg in mark.args:
warnings.filterwarnings(*parse_warning_filter(arg, escape=False))

try:
task.execute(**kwargs)
except Exception:
exc_info = sys.exc_info()
processed_exc_info = _process_exception(
exc_info, show_locals, console_options
)
else:
processed_exc_info = None

warning_reports = []
for warning_message in log:
fs_location = warning_message.filename, warning_message.lineno
warning_reports.append(
WarningReport(
message=warning_record_to_str(warning_message),
fs_location=fs_location,
id_=task_short_name,
)
)

return warning_reports, processed_exc_info


def _process_exception(
Expand Down Expand Up @@ -224,11 +278,33 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None:
"""
if session.config["n_workers"] > 1:
kwargs = _create_kwargs_for_task(task)
return session.executor.submit(task.execute, **kwargs)
return session.executor.submit(
_mock_processes_for_threads, func=task.execute, **kwargs
)
else:
return None


def _mock_processes_for_threads(
func: Callable[..., Any], **kwargs: Any
) -> tuple[list[Any], tuple[type[BaseException], BaseException, TracebackType] | None]:
"""Mock execution function such that it returns the same as for processes.

The function for processes returns ``warning_reports`` and an ``exception``. With
threads, these object are collected by the main and not the subprocess. So, we just
return placeholders.

"""
__tracebackhide__ = True
try:
func(**kwargs)
except Exception:
exc_info = sys.exc_info()
else:
exc_info = None
return [], exc_info


def _create_kwargs_for_task(task: Task) -> dict[Any, Any]:
"""Create kwargs for task function."""
kwargs = {**task.kwargs}
Expand Down
39 changes: 38 additions & 1 deletion tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def myfunc():
"n_workers": 2,
"parallel_backend": parallel_backend,
"show_locals": False,
"filterwarnings": [],
}

with PARALLEL_BACKENDS[parallel_backend](
Expand All @@ -135,7 +136,9 @@ def myfunc():
future = backend_name_space.pytask_execute_task(session, task)
executor.shutdown()

assert future.result() is None
warning_reports, exception = future.result()
assert warning_reports == []
assert exception is None


@pytest.mark.end_to_end
Expand Down Expand Up @@ -288,3 +291,37 @@ def task_example(produces):
)

assert session.exit_code == ExitCode.OK


@pytest.mark.end_to_end
@pytest.mark.parametrize(
"parallel_backend",
# Capturing warnings is not thread-safe.
[backend for backend in PARALLEL_BACKENDS if backend != "threads"],
)
def test_collect_warnings_from_parallelized_tasks(runner, tmp_path, parallel_backend):
source = """
import pytask
import warnings

for i in range(2):

@pytask.mark.task(id=i, kwargs={"produces": f"{i}.txt"})
def task_example(produces):
warnings.warn("This is a warning.")
produces.touch()
"""
tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source))

result = runner.invoke(
cli, [tmp_path.as_posix(), "-n", "2", "--parallel-backend", parallel_backend]
)

assert result.exit_code == ExitCode.OK
assert "Warnings" in result.output
assert "This is a warning." in result.output
assert "capture_warnings.html" in result.output

warnings_block = result.output.split("Warnings")[1]
assert "task_example.py::task_example[0]" in warnings_block
assert "task_example.py::task_example[1]" in warnings_block
3 changes: 3 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ warn-symbols =
pytest.mark.skip = Remove 'skip' flag for tests.

[pytest]
testpaths =
# Do not add src since it messes with the loading of pytask-parallel as a plugin.
tests
addopts = --doctest-modules
filterwarnings =
ignore: the imp module is deprecated in favour of importlib
Expand Down