From e881b13bbe74e15782e75f16ea92ae5365eb43fb Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Mon, 1 Apr 2024 22:53:31 +0200 Subject: [PATCH 1/2] Redirect stdout and stderr. --- CHANGES.md | 2 ++ src/pytask_parallel/execute.py | 19 +++++++++-- src/pytask_parallel/processes.py | 27 +++++++++++++-- src/pytask_parallel/threads.py | 8 +++-- src/pytask_parallel/utils.py | 8 +++-- tests/test_capture.py | 58 ++++++++++++++++++++++++++++++++ 6 files changed, 112 insertions(+), 10 deletions(-) create mode 100644 tests/test_capture.py diff --git a/CHANGES.md b/CHANGES.md index 0f53184..3025244 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and - {pull}`85` simplifies code since loky is a dependency. - {pull}`88` updates handling `Traceback`. - {pull}`89` restructures the package. +- {pull}`90` redirects stdout and stderr from processes and loky and shows them in error + reports. ## 0.4.1 - 2024-01-12 diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index e6cf659..cb0ea0c 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -90,11 +90,24 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 future = running_tasks[task_name] if future.done(): - python_nodes, warnings_reports, exc_info = parse_future_result( - future - ) + ( + python_nodes, + warnings_reports, + exc_info, + captured_stdout, + captured_stderr, + ) = parse_future_result(future) session.warnings.extend(warnings_reports) + if captured_stdout: + task.report_sections.append( + ("call", "stdout", captured_stdout) + ) + if captured_stderr: + task.report_sections.append( + ("call", "stderr", captured_stderr) + ) + if exc_info is not None: task = session.dag.nodes[task_name]["task"] newly_collected_reports.append( diff --git a/src/pytask_parallel/processes.py b/src/pytask_parallel/processes.py index 9f139c1..c00aa2e 100644 --- a/src/pytask_parallel/processes.py +++ b/src/pytask_parallel/processes.py @@ -5,7 +5,10 @@ import inspect import sys import warnings +from contextlib import redirect_stderr +from contextlib import redirect_stdout from functools import partial +from io import StringIO from typing import TYPE_CHECKING from typing import Any from typing import Callable @@ -98,6 +101,8 @@ def _execute_task( # noqa: PLR0913 PyTree[PythonNode | None], list[WarningReport], tuple[type[BaseException], BaseException, str] | None, + str, + str, ]: """Unserialize and execute task. @@ -111,8 +116,13 @@ def _execute_task( # noqa: PLR0913 # Patch set_trace and breakpoint to show a better error message. _patch_set_trace_and_breakpoint() + captured_stdout_buffer = StringIO() + captured_stderr_buffer = StringIO() + # Catch warnings and store them in a list. - with warnings.catch_warnings(record=True) as log: + with warnings.catch_warnings(record=True) as log, redirect_stdout( + captured_stdout_buffer + ), redirect_stderr(captured_stderr_buffer): # Apply global filterwarnings. for arg in session_filterwarnings: warnings.filterwarnings(*parse_warning_filter(arg, escape=False)) @@ -146,12 +156,25 @@ def _execute_task( # noqa: PLR0913 ) ) + captured_stdout_buffer.seek(0) + captured_stderr_buffer.seek(0) + captured_stdout = captured_stdout_buffer.read() + captured_stderr = captured_stderr_buffer.read() + captured_stdout_buffer.close() + captured_stderr_buffer.close() + # Collect all PythonNodes that are products to pass values back to the main process. python_nodes = tree_map( lambda x: x if isinstance(x, PythonNode) else None, task.produces ) - return python_nodes, warning_reports, processed_exc_info + return ( + python_nodes, + warning_reports, + processed_exc_info, + captured_stdout, + captured_stderr, + ) def _process_exception( diff --git a/src/pytask_parallel/threads.py b/src/pytask_parallel/threads.py index 2e3c356..936d804 100644 --- a/src/pytask_parallel/threads.py +++ b/src/pytask_parallel/threads.py @@ -35,7 +35,11 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[Any]: def _mock_processes_for_threads( task: PTask, **kwargs: Any ) -> tuple[ - None, list[Any], tuple[type[BaseException], BaseException, TracebackType] | None + None, + list[Any], + tuple[type[BaseException], BaseException, TracebackType] | None, + str, + str, ]: """Mock execution function such that it returns the same as for processes. @@ -52,4 +56,4 @@ def _mock_processes_for_threads( else: handle_task_function_return(task, out) exc_info = None - return None, [], exc_info + return None, [], exc_info, "", "" diff --git a/src/pytask_parallel/utils.py b/src/pytask_parallel/utils.py index 11cc19f..06acbe8 100644 --- a/src/pytask_parallel/utils.py +++ b/src/pytask_parallel/utils.py @@ -26,20 +26,22 @@ def parse_future_result( dict[str, PyTree[PythonNode | None]] | None, list[WarningReport], tuple[type[BaseException], BaseException, TracebackType] | None, + str, + str, ]: """Parse the result of a future.""" # An exception was raised before the task was executed. future_exception = future.exception() if future_exception is not None: exc_info = _parse_future_exception(future_exception) - return None, [], exc_info + return None, [], exc_info, "", "" out = future.result() - if isinstance(out, tuple) and len(out) == 3: # noqa: PLR2004 + if isinstance(out, tuple) and len(out) == 5: # noqa: PLR2004 return out if out is None: - return None, [], None + return None, [], None, "", "" # What to do when the output does not match? msg = ( diff --git a/tests/test_capture.py b/tests/test_capture.py new file mode 100644 index 0000000..4659f7b --- /dev/null +++ b/tests/test_capture.py @@ -0,0 +1,58 @@ +import textwrap + +import pytest +from pytask import ExitCode +from pytask import cli +from pytask_parallel import ParallelBackend + + +@pytest.mark.end_to_end() +@pytest.mark.parametrize( + "parallel_backend", [ParallelBackend.PROCESSES, ParallelBackend.LOKY] +) +@pytest.mark.parametrize("show_capture", ["no", "stdout", "stderr", "all"]) +def test_show_capture(tmp_path, runner, parallel_backend, show_capture): + source = """ + import sys + + def task_show_capture(): + sys.stdout.write("xxxx") + sys.stderr.write("zzzz") + raise Exception + """ + tmp_path.joinpath("task_show_capture.py").write_text(textwrap.dedent(source)) + + cmd_arg = "-s" if show_capture == "s" else f"--show-capture={show_capture}" + result = runner.invoke( + cli, + [ + tmp_path.as_posix(), + cmd_arg, + "--parallel-backend", + parallel_backend, + "-n", + "2", + ], + ) + + assert result.exit_code == ExitCode.FAILED + + if show_capture in ("no", "s"): + assert "Captured" not in result.output + elif show_capture == "stdout": + assert "Captured stdout" in result.output + assert "xxxx" in result.output + assert "Captured stderr" not in result.output + # assert "zzzz" not in result.output + elif show_capture == "stderr": + assert "Captured stdout" not in result.output + # assert "xxxx" not in result.output + assert "Captured stderr" in result.output + assert "zzzz" in result.output + elif show_capture == "all": + assert "Captured stdout" in result.output + assert "xxxx" in result.output + assert "Captured stderr" in result.output + assert "zzzz" in result.output + else: # pragma: no cover + raise NotImplementedError From f187385cbf96b6f355ceab27cf12b3015ceb5ec6 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Mon, 1 Apr 2024 22:54:08 +0200 Subject: [PATCH 2/2] fix. --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 3025244..c0f13ac 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,7 +10,7 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and - {pull}`85` simplifies code since loky is a dependency. - {pull}`88` updates handling `Traceback`. - {pull}`89` restructures the package. -- {pull}`90` redirects stdout and stderr from processes and loky and shows them in error +- {pull}`92` redirects stdout and stderr from processes and loky and shows them in error reports. ## 0.4.1 - 2024-01-12