diff --git a/.gitignore b/.gitignore index 7042fe6..1dc2b6d 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,6 @@ build dist src/pytask_parallel/_version.py tests/test_jupyter/*.txt +.mypy_cache +.pytest_cache +.ruff_cache diff --git a/docs/source/changes.md b/docs/source/changes.md index 1c2f25a..8bcdd10 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -17,6 +17,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and - {pull}`94` implements `ParallelBackend.NONE` as the default backend. - {pull}`95` formalizes parallel backends and apply wrappers for backends with threads or processes automatically. +- {pull}`96` handles local paths with remote executors. `PathNode`s are not supported as + dependencies or products (except for return annotations). ## 0.4.1 - 2024-01-12 diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index cf880d3..b502755 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -80,17 +80,16 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.hook.pytask_execute_task_setup( session=session, task=task ) + running_tasks[task_name] = session.hook.pytask_execute_task( + session=session, task=task + ) + sleeper.reset() except Exception: # noqa: BLE001 report = ExecutionReport.from_task_and_exception( task, sys.exc_info() ) newly_collected_reports.append(report) session.scheduler.done(task_name) - else: - running_tasks[task_name] = session.hook.pytask_execute_task( - session=session, task=task - ) - sleeper.reset() if not ready_tasks: sleeper.increment() @@ -123,7 +122,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 session.scheduler.done(task_name) else: task = session.dag.nodes[task_name]["task"] - _update_python_nodes(task, wrapper_result.python_nodes) + _update_carry_over_products( + task, wrapper_result.carry_over_products + ) try: session.hook.pytask_execute_task_teardown( @@ -169,9 +170,11 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: executor. """ - worker_type = registry.registry[session.config["parallel_backend"]].worker_type + parallel_backend = registry.registry[session.config["parallel_backend"]] + worker_type = parallel_backend.worker_type + remote = parallel_backend.remote - kwargs = create_kwargs_for_task(task) + kwargs = create_kwargs_for_task(task, remote=remote) if worker_type == WorkerType.PROCESSES: # Prevent circular import for loky backend. @@ -188,10 +191,11 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: return session.config["_parallel_executor"].submit( wrap_task_in_process, task=task, - kwargs=kwargs, - show_locals=session.config["show_locals"], console_options=console.options, + kwargs=kwargs, + remote=remote, session_filterwarnings=session.config["filterwarnings"], + show_locals=session.config["show_locals"], task_filterwarnings=get_marks(task, "filterwarnings"), ) if worker_type == WorkerType.THREADS: @@ -211,21 +215,23 @@ def pytask_unconfigure() -> None: registry.reset() -def _update_python_nodes( - task: PTask, python_nodes: PyTree[PythonNode | None] | None +def _update_carry_over_products( + task: PTask, carry_over_products: PyTree[PythonNode | None] | None ) -> None: - """Update the python nodes of a task with the python nodes from the future.""" + """Update products carry over from a another process or remote worker.""" - def _update_python_node(x: PNode, y: PythonNode | None) -> PNode: + def _update_carry_over_node(x: PNode, y: PythonNode | None) -> PNode: if y: x.save(y.load()) return x - structure_python_nodes = tree_structure(python_nodes) + structure_python_nodes = tree_structure(carry_over_products) structure_produces = tree_structure(task.produces) # strict must be false when none is leaf. if structure_produces.is_prefix(structure_python_nodes, strict=False): - task.produces = tree_map(_update_python_node, task.produces, python_nodes) # type: ignore[assignment] + task.produces = tree_map( + _update_carry_over_node, task.produces, carry_over_products + ) # type: ignore[assignment] @define(kw_only=True) diff --git a/src/pytask_parallel/utils.py b/src/pytask_parallel/utils.py index b7c3764..0d54d7a 100644 --- a/src/pytask_parallel/utils.py +++ b/src/pytask_parallel/utils.py @@ -4,12 +4,19 @@ import inspect from functools import partial +from pathlib import PosixPath +from pathlib import WindowsPath from typing import TYPE_CHECKING from typing import Any from typing import Callable +from pytask import NodeLoadError +from pytask import PathNode +from pytask import PNode +from pytask import PProvisionalNode from pytask.tree_util import PyTree -from pytask.tree_util import tree_map +from pytask.tree_util import tree_map_with_path +from upath.implementations.local import FilePath if TYPE_CHECKING: from concurrent.futures import Future @@ -22,7 +29,12 @@ from pytask_parallel.wrappers import WrapperResult -__all__ = ["create_kwargs_for_task", "get_module", "parse_future_result"] +__all__ = [ + "create_kwargs_for_task", + "get_module", + "parse_future_result", + "is_local_path", +] def parse_future_result( @@ -32,11 +44,12 @@ def parse_future_result( # An exception was raised before the task was executed. future_exception = future.exception() if future_exception is not None: + # Prevent circular import for loky backend. from pytask_parallel.wrappers import WrapperResult exc_info = _parse_future_exception(future_exception) return WrapperResult( - python_nodes=None, + carry_over_products=None, warning_reports=[], exc_info=exc_info, stdout="", @@ -45,17 +58,80 @@ def parse_future_result( return future.result() -def create_kwargs_for_task(task: PTask) -> dict[str, PyTree[Any]]: +def _safe_load( + path: tuple[Any, ...], + node: PNode | PProvisionalNode, + task: PTask, + *, + is_product: bool, + remote: bool, +) -> Any: + """Load a node and catch exceptions.""" + _rich_traceback_guard = True + # Get the argument name like "path" or "return" for function returns. + argument = path[0] + + # Raise an error if a PPathNode with a local path is used as a dependency or product + # (except as a return value). + if ( + remote + and argument != "return" + and isinstance(node, PathNode) + and is_local_path(node.path) + ): + if is_product: + msg = ( + f"You cannot use a local path as a product in argument {argument!r} " + "with a remote backend. Either return the content that should be saved " + "in the file with a return annotation " + "(https://tinyurl.com/pytask-return) or use a nonlocal path to store " + "the file in S3 or their like https://tinyurl.com/pytask-remote." + ) + raise NodeLoadError(msg) + msg = ( + f"You cannot use a local path as a dependency in argument {argument!r} " + "with a remote backend. Upload the file to a remote storage like S3 " + "and use the remote path instead: https://tinyurl.com/pytask-remote." + ) + raise NodeLoadError(msg) + + try: + return node.load(is_product=is_product) + except Exception as e: # noqa: BLE001 + msg = f"Exception while loading node {node.name!r} of task {task.name!r}" + raise NodeLoadError(msg) from e + + +def create_kwargs_for_task(task: PTask, *, remote: bool) -> dict[str, PyTree[Any]]: """Create kwargs for task function.""" parameters = inspect.signature(task.function).parameters kwargs = {} + for name, value in task.depends_on.items(): - kwargs[name] = tree_map(lambda x: x.load(), value) + kwargs[name] = tree_map_with_path( + lambda p, x: _safe_load( + (name, *p), # noqa: B023 + x, + task, + is_product=False, + remote=remote, + ), + value, + ) for name, value in task.produces.items(): if name in parameters: - kwargs[name] = tree_map(lambda x: x.load(), value) + kwargs[name] = tree_map_with_path( + lambda p, x: _safe_load( + (name, *p), # noqa: B023 + x, + task, + is_product=True, + remote=remote, + ), + value, + ) return kwargs @@ -84,3 +160,8 @@ def get_module(func: Callable[..., Any], path: Path | None) -> ModuleType: if path: return inspect.getmodule(func, path.as_posix()) return inspect.getmodule(func) + + +def is_local_path(path: Path) -> bool: + """Check if a path is local.""" + return isinstance(path, (FilePath, PosixPath, WindowsPath)) diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 3e2f42f..185f819 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -11,6 +11,8 @@ from typing import Any from attrs import define +from pytask import PNode +from pytask import PPathNode from pytask import PTask from pytask import PythonNode from pytask import Traceback @@ -19,10 +21,11 @@ from pytask import parse_warning_filter from pytask import warning_record_to_str from pytask.tree_util import PyTree -from pytask.tree_util import tree_leaves -from pytask.tree_util import tree_map +from pytask.tree_util import tree_map_with_path from pytask.tree_util import tree_structure +from pytask_parallel.utils import is_local_path + if TYPE_CHECKING: from types import TracebackType @@ -35,35 +38,13 @@ @define(kw_only=True) class WrapperResult: - python_nodes: PyTree[PythonNode | None] + carry_over_products: PyTree[PythonNode | None] warning_reports: list[WarningReport] exc_info: tuple[type[BaseException], BaseException, TracebackType | str] | None stdout: str stderr: str -def _handle_task_function_return(task: PTask, out: Any) -> None: - """Handle the return value of a task function.""" - if "return" not in task.produces: - return - - structure_out = tree_structure(out) - structure_return = tree_structure(task.produces["return"]) - # strict must be false when none is leaf. - if not structure_return.is_prefix(structure_out, strict=False): - msg = ( - "The structure of the return annotation is not a subtree of " - "the structure of the function return.\n\nFunction return: " - f"{structure_out}\n\nReturn annotation: {structure_return}" - ) - raise ValueError(msg) - - nodes = tree_leaves(task.produces["return"]) - values = structure_return.flatten_up_to(out) - for node, value in zip(nodes, values): - node.save(value) - - def wrap_task_in_thread(task: PTask, **kwargs: Any) -> WrapperResult: """Mock execution function such that it returns the same as for processes. @@ -78,19 +59,25 @@ def wrap_task_in_thread(task: PTask, **kwargs: Any) -> WrapperResult: except Exception: # noqa: BLE001 exc_info = sys.exc_info() else: - _handle_task_function_return(task, out) + _handle_function_products(task, out, remote=False) exc_info = None return WrapperResult( - python_nodes=None, warning_reports=[], exc_info=exc_info, stdout="", stderr="" + carry_over_products=None, + warning_reports=[], + exc_info=exc_info, + stdout="", + stderr="", ) def wrap_task_in_process( # noqa: PLR0913 task: PTask, - kwargs: dict[str, Any], - show_locals: bool, # noqa: FBT001 + *, console_options: ConsoleOptions, + kwargs: dict[str, Any], + remote: bool, session_filterwarnings: tuple[str, ...], + show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: """Unserialize and execute task. @@ -128,9 +115,10 @@ def wrap_task_in_process( # noqa: PLR0913 processed_exc_info = _render_traceback_to_string( exc_info, show_locals, console_options ) + products = None else: # Save products. - _handle_task_function_return(task, out) + products = _handle_function_products(task, out, remote=remote) processed_exc_info = None task_display_name = getattr(task, "display_name", task.name) @@ -152,13 +140,8 @@ def wrap_task_in_process( # noqa: PLR0913 captured_stdout_buffer.close() captured_stderr_buffer.close() - # Collect all PythonNodes that are products to pass values back to the main process. - python_nodes = tree_map( - lambda x: x if isinstance(x, PythonNode) else None, task.produces - ) - return WrapperResult( - python_nodes=python_nodes, + carry_over_products=products, warning_reports=warning_reports, exc_info=processed_exc_info, stdout=captured_stdout, @@ -199,3 +182,67 @@ def _render_traceback_to_string( segments = console.render(traceback, options=console_options) text = "".join(segment.text for segment in segments) return (*exc_info[:2], text) + + +def _handle_function_products( + task: PTask, out: Any, *, remote: bool +) -> PyTree[PythonNode | None]: + """Handle the products of the task. + + The functions first responsibility is to push the returns of the function to the + defined nodes. + + Secondly, the function collects two kinds of products that need to be carried over + to the main process for storing them. + + 1. Any product that is a :class:`~pytask.PythonNode` needs to be carried over to the + main process as otherwise their value would be lost. + 2. If the function is executed remotely and the return value should be stored in a + node with a local path like :class:`pytask.PickleNode`, we need to carry over the + value to the main process again and, then, save the value to the node as the + local path does not exist remotely. + + """ + # Check that the return value has the correct structure. + if "return" in task.produces: + structure_out = tree_structure(out) + structure_return = tree_structure(task.produces["return"]) + # strict must be false when none is leaf. + if not structure_return.is_prefix(structure_out, strict=False): + msg = ( + "The structure of the return annotation is not a subtree of " + "the structure of the function return.\n\nFunction return: " + f"{structure_out}\n\nReturn annotation: {structure_return}" + ) + raise ValueError(msg) + + def _save_and_carry_over_product( + path: tuple[Any, ...], node: PNode + ) -> PythonNode | None: + argument = path[0] + + if argument != "return": + if isinstance(node, PythonNode): + return node + return None + + value = out + for p in path[1:]: + value = value[p] + + # If the node is a PythonNode, we need to carry it over to the main process. + if isinstance(node, PythonNode): + node.save(value) + return node + + # If the path is local and we are remote, we need to carry over the value to + # the main process as a PythonNode and save it later. + if remote and isinstance(node, PPathNode) and is_local_path(node.path): + return PythonNode(value=value) + + # If no condition applies, we save the value and do not carry it over. Like a + # remote path to S3. + node.save(value) + return None + + return tree_map_with_path(_save_and_carry_over_product, task.produces) diff --git a/tests/test_execute.py b/tests/test_execute.py index 516b35a..31940f3 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -319,3 +319,59 @@ def create_text(text): ) assert result.exit_code == ExitCode.OK assert tmp_path.joinpath("file.txt").exists() + + +@pytest.mark.end_to_end() +@pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS) +def test_execute_tasks_and_pass_values_by_python_node_return( + runner, tmp_path, parallel_backend +): + source = """ + from pytask import PythonNode + from typing_extensions import Annotated + from pathlib import Path + + node_text = PythonNode(name="text") + + def task_create_text() -> Annotated[int, node_text]: + return "This is the text." + + def task_create_file( + text: Annotated[int, node_text] + ) -> Annotated[str, Path("file.txt")]: + return text + """ + tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source)) + result = runner.invoke( + cli, [tmp_path.as_posix(), "--parallel-backend", parallel_backend] + ) + assert result.exit_code == ExitCode.OK + assert tmp_path.joinpath("file.txt").read_text() == "This is the text." + + +@pytest.mark.end_to_end() +@pytest.mark.parametrize("parallel_backend", _IMPLEMENTED_BACKENDS) +def test_execute_tasks_and_pass_values_by_python_node_product( + runner, tmp_path, parallel_backend +): + source = """ + from pytask import PythonNode, Product + from typing_extensions import Annotated + from pathlib import Path + + node_text = PythonNode(name="text") + + def task_create_text(node: Annotated[PythonNode, Product] = node_text): + node.save("This is the text.") + + def task_create_file( + text: Annotated[int, node_text] + ) -> Annotated[str, Path("file.txt")]: + return text + """ + tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source)) + result = runner.invoke( + cli, [tmp_path.as_posix(), "--parallel-backend", parallel_backend] + ) + assert result.exit_code == ExitCode.OK + assert tmp_path.joinpath("file.txt").read_text() == "This is the text." diff --git a/tests/test_remote.py b/tests/test_remote.py new file mode 100644 index 0000000..cfff33c --- /dev/null +++ b/tests/test_remote.py @@ -0,0 +1,229 @@ +import pickle +import textwrap + +import pytest +from pytask import ExitCode +from pytask import cli + + +@pytest.fixture(autouse=True) +def _setup_remote_backend(tmp_path): + source = """ + from loky import get_reusable_executor + from pytask_parallel import ParallelBackend + from pytask_parallel import registry + + def custom_builder(n_workers): + print("Build custom executor.") + return get_reusable_executor(max_workers=n_workers) + + registry.register_parallel_backend( + ParallelBackend.CUSTOM, custom_builder, worker_type="processes", remote=True + ) + """ + tmp_path.joinpath("config.py").write_text(textwrap.dedent(source)) + + +@pytest.mark.end_to_end() +def test_python_node(runner, tmp_path): + source = """ + from pathlib import Path + from typing_extensions import Annotated + from pytask import PythonNode, Product + + first_part = PythonNode() + + def task_first(node: Annotated[PythonNode, first_part, Product]): + node.save("Hello ") + + full_text = PythonNode() + + def task_second( + first_part: Annotated[str, first_part] + ) -> Annotated[str, full_text]: + return first_part + "World!" + + def task_third( + full_text: Annotated[str, full_text] + ) -> Annotated[str, Path("output.txt")]: + return full_text + """ + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + + result = runner.invoke( + cli, + [ + tmp_path.as_posix(), + "--parallel-backend", + "custom", + "--hook-module", + tmp_path.joinpath("config.py").as_posix(), + ], + ) + assert result.exit_code == ExitCode.OK + assert "3 Succeeded" in result.output + assert tmp_path.joinpath("output.txt").read_text() == "Hello World!" + + +@pytest.mark.end_to_end() +def test_local_path_as_input(runner, tmp_path): + source = """ + from pathlib import Path + from typing_extensions import Annotated + + def task_example(path: Path = Path("in.txt")) -> Annotated[str, Path("output.txt")]: + return path.read_text() + """ + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + tmp_path.joinpath("in.txt").write_text("Hello World!") + + result = runner.invoke( + cli, + [ + tmp_path.as_posix(), + "--parallel-backend", + "custom", + "--hook-module", + tmp_path.joinpath("config.py").as_posix(), + ], + ) + assert result.exit_code == ExitCode.FAILED + assert "You cannot use a local path" in result.output + + +@pytest.mark.end_to_end() +def test_local_path_as_product(runner, tmp_path): + source = """ + from pytask import Product + from pathlib import Path + from typing_extensions import Annotated + + def task_example(path: Annotated[Path, Product] = Path("output.txt")): + return path.write_text("Hello World!") + """ + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + + result = runner.invoke( + cli, + [ + tmp_path.as_posix(), + "--parallel-backend", + "custom", + "--hook-module", + tmp_path.joinpath("config.py").as_posix(), + ], + ) + assert result.exit_code == ExitCode.FAILED + assert "You cannot use a local path" in result.output + + +@pytest.mark.end_to_end() +def test_local_path_as_return(runner, tmp_path): + source = """ + from pathlib import Path + from typing_extensions import Annotated + + def task_example() -> Annotated[str, Path("output.txt")]: + return "Hello World!" + """ + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + + result = runner.invoke( + cli, + [ + tmp_path.as_posix(), + "--parallel-backend", + "custom", + "--hook-module", + tmp_path.joinpath("config.py").as_posix(), + ], + ) + assert result.exit_code == ExitCode.OK + assert "1 Succeeded" in result.output + assert tmp_path.joinpath("output.txt").read_text() == "Hello World!" + + +@pytest.mark.end_to_end() +def test_pickle_node_with_local_path_as_input(runner, tmp_path): + source = """ + from pytask import PickleNode + from pathlib import Path + from typing_extensions import Annotated + + def task_example( + text: Annotated[str, PickleNode(path=Path("data.pkl"))] + ) -> Annotated[str, Path("output.txt")]: + return text + """ + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + tmp_path.joinpath("data.pkl").write_bytes(pickle.dumps("Hello World!")) + + result = runner.invoke( + cli, + [ + tmp_path.as_posix(), + "--parallel-backend", + "custom", + "--hook-module", + tmp_path.joinpath("config.py").as_posix(), + ], + ) + assert result.exit_code == ExitCode.OK + assert "1 Succeeded" in result.output + assert tmp_path.joinpath("output.txt").read_text() == "Hello World!" + + +@pytest.mark.end_to_end() +def test_pickle_node_with_local_path_as_product(runner, tmp_path): + source = """ + from pytask import PickleNode, Product + from pathlib import Path + from typing_extensions import Annotated + + def task_example( + node: Annotated[PickleNode, PickleNode(path=Path("data.pkl")), Product] + ): + node.save("Hello World!") + """ + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + + result = runner.invoke( + cli, + [ + tmp_path.as_posix(), + "--parallel-backend", + "custom", + "--hook-module", + tmp_path.joinpath("config.py").as_posix(), + ], + ) + assert result.exit_code == ExitCode.OK + assert "1 Succeeded" in result.output + assert pickle.loads(tmp_path.joinpath("data.pkl").read_bytes()) == "Hello World!" # noqa: S301 + + +@pytest.mark.end_to_end() +def test_pickle_node_with_local_path_as_return(runner, tmp_path): + source = """ + from pytask import PickleNode + from pathlib import Path + from typing_extensions import Annotated + + def task_example() -> Annotated[str, PickleNode(path=Path("data.pkl"))]: + return "Hello World!" + """ + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + + result = runner.invoke( + cli, + [ + tmp_path.as_posix(), + "--parallel-backend", + "custom", + "--hook-module", + tmp_path.joinpath("config.py").as_posix(), + ], + ) + assert result.exit_code == ExitCode.OK + assert "1 Succeeded" in result.output + assert pickle.loads(tmp_path.joinpath("data.pkl").read_bytes()) == "Hello World!" # noqa: S301