Skip to content

Show rich tracebacks for errors in subprocesses. #27

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 20, 2021
Merged
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ all releases are available on `PyPI <https://pypi.org/project/pytask-parallel>`_
`Anaconda.org <https://anaconda.org/conda-forge/pytask-parallel>`_.


0.0.9 - 2021-xx-xx
0.1.0 - 2021-07-20
------------------

- :gh:`19` adds ``conda-forge`` to the ``README.rst``.
- :gh:`22` add note that the debugger cannot be used together with pytask-parallel.
- :gh:`24` replaces versioneer with setuptools-scm.
- :gh:`25` aborts build and prints reports on ``KeyboardInterrupt``.
- :gh:`27` enables rich tracebacks from subprocesses.


0.0.8 - 2021-03-05
Expand Down
4 changes: 2 additions & 2 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ channels:
- nodefaults

dependencies:
- python=3.6
- python
- pip
- setuptools_scm
- toml
Expand All @@ -16,7 +16,7 @@ dependencies:
- conda-verify

# Package dependencies
- pytask >= 0.0.11
- pytask >= 0.1.0
- cloudpickle
- loky

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ install_requires =
click
cloudpickle
loky
pytask>=0.0.11
pytask>=0.1.0
python_requires = >=3.6
include_package_data = True
package_dir = =src
Expand Down
59 changes: 49 additions & 10 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
"""Contains code relevant to the execution."""
import sys
import time
from typing import Any
from typing import Tuple

import cloudpickle
from _pytask.config import hookimpl
from _pytask.console import console
from _pytask.report import ExecutionReport
from _pytask.traceback import remove_internal_traceback_frames_from_exc_info
from pytask_parallel.backends import PARALLEL_BACKENDS
from rich.console import ConsoleOptions
from rich.traceback import Traceback


@hookimpl
def pytask_post_parse(config):
"""Register the parallel backend."""
if config["parallel_backend"] == "processes":
if config["parallel_backend"] in ["loky", "processes"]:
config["pm"].register(ProcessesNameSpace)
elif config["parallel_backend"] in ["threads", "loky"]:
elif config["parallel_backend"] in ["threads"]:
config["pm"].register(DefaultBackendNameSpace)


Expand Down Expand Up @@ -72,13 +78,23 @@ def pytask_execute_build(session):

for task_name in list(running_tasks):
future = running_tasks[task_name]
if future.done() and future.exception() is not None:
if future.done() and (
future.exception() is not None
or future.result() is not None
):
task = session.dag.nodes[task_name]["task"]
exception = future.exception()
newly_collected_reports.append(
ExecutionReport.from_task_and_exception(
task, (type(exception), exception, None)
if future.exception() is not None:
exception = future.exception()
exc_info = (
type(exception),
exception,
exception.__traceback__,
)
else:
exc_info = future.result()

newly_collected_reports.append(
ExecutionReport.from_task_and_exception(task, exc_info)
)
running_tasks.pop(task_name)
session.scheduler.done(task_name)
Expand Down Expand Up @@ -132,18 +148,41 @@ def pytask_execute_task(session, task): # noqa: N805
"""
if session.config["n_workers"] > 1:
bytes_ = cloudpickle.dumps(task)
return session.executor.submit(unserialize_and_execute_task, bytes_)
return session.executor.submit(
_unserialize_and_execute_task,
bytes_=bytes_,
show_locals=session.config["show_locals"],
console_options=console.options,
)


def unserialize_and_execute_task(bytes_):
def _unserialize_and_execute_task(bytes_, show_locals, console_options):
"""Unserialize and execute task.

This function receives bytes and unpickles them to a task which is them execute
in a spawned process or thread.

"""
__tracebackhide__ = True

task = cloudpickle.loads(bytes_)
task.execute()

try:
task.execute()
except Exception:
exc_info = sys.exc_info()
processed_exc_info = _process_exception(exc_info, show_locals, console_options)
return processed_exc_info


def _process_exception(
exc_info: Tuple[Any], show_locals: bool, console_options: ConsoleOptions
) -> Tuple[Any]:
exc_info = remove_internal_traceback_frames_from_exc_info(exc_info)
traceback = Traceback.from_exception(*exc_info, show_locals=show_locals)
segments = console.render(traceback, options=console_options)
text = "".join(segment.text for segment in segments)
return (*exc_info[:2], text)


class DefaultBackendNameSpace:
Expand Down
32 changes: 31 additions & 1 deletion tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ def myfunc():
task = DummyTask(myfunc)

session = Session()
session.config = {"n_workers": 2, "parallel_backend": parallel_backend}
session.config = {
"n_workers": 2,
"parallel_backend": parallel_backend,
"show_locals": False,
}

with PARALLEL_BACKENDS[parallel_backend](
max_workers=session.config["n_workers"]
Expand Down Expand Up @@ -235,3 +239,29 @@ def task_5():
assert first_task_name.endswith("task_0") or first_task_name.endswith("task_3")
last_task_name = session.execution_reports[-1].task.name
assert last_task_name.endswith("task_2") or last_task_name.endswith("task_5")


@pytest.mark.end_to_end
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
@pytest.mark.parametrize("show_locals", [True, False])
def test_rendering_of_tracebacks_with_rich(
runner, tmp_path, parallel_backend, show_locals
):
source = """
import pytask

def task_raising_error():
a = list(range(5))
raise Exception
"""
tmp_path.joinpath("task_dummy.py").write_text(textwrap.dedent(source))

args = [tmp_path.as_posix(), "-n", "2", "--parallel-backend", parallel_backend]
if show_locals:
args.append("--show-locals")
result = runner.invoke(cli, args)

assert result.exit_code == 1
assert "───── Traceback" in result.output
assert ("───── locals" in result.output) is show_locals
assert ("[0, 1, 2, 3, 4]" in result.output) is show_locals
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ basepython = python
conda_deps =
cloudpickle
loky
pytask >=0.0.11
pytask >=0.1.0
pytest
pytest-cov
pytest-xdist
Expand Down