Skip to content

Handle local paths in remote workers. #96

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ build
dist
src/pytask_parallel/_version.py
tests/test_jupyter/*.txt
.mypy_cache
.pytest_cache
.ruff_cache
2 changes: 2 additions & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
- {pull}`94` implements `ParallelBackend.NONE` as the default backend.
- {pull}`95` formalizes parallel backends and apply wrappers for backends with threads
or processes automatically.
- {pull}`96` handles local paths with remote executors. `PathNode`s are not supported as
dependencies or products (except for return annotations).

## 0.4.1 - 2024-01-12

Expand Down
38 changes: 22 additions & 16 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,16 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
session.hook.pytask_execute_task_setup(
session=session, task=task
)
running_tasks[task_name] = session.hook.pytask_execute_task(
session=session, task=task
)
sleeper.reset()
except Exception: # noqa: BLE001
report = ExecutionReport.from_task_and_exception(
task, sys.exc_info()
)
newly_collected_reports.append(report)
session.scheduler.done(task_name)
else:
running_tasks[task_name] = session.hook.pytask_execute_task(
session=session, task=task
)
sleeper.reset()

if not ready_tasks:
sleeper.increment()
Expand Down Expand Up @@ -123,7 +122,9 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
session.scheduler.done(task_name)
else:
task = session.dag.nodes[task_name]["task"]
_update_python_nodes(task, wrapper_result.python_nodes)
_update_carry_over_products(
task, wrapper_result.carry_over_products
)

try:
session.hook.pytask_execute_task_teardown(
Expand Down Expand Up @@ -169,9 +170,11 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]:
executor.

"""
worker_type = registry.registry[session.config["parallel_backend"]].worker_type
parallel_backend = registry.registry[session.config["parallel_backend"]]
worker_type = parallel_backend.worker_type
remote = parallel_backend.remote

kwargs = create_kwargs_for_task(task)
kwargs = create_kwargs_for_task(task, remote=remote)

if worker_type == WorkerType.PROCESSES:
# Prevent circular import for loky backend.
Expand All @@ -188,10 +191,11 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]:
return session.config["_parallel_executor"].submit(
wrap_task_in_process,
task=task,
kwargs=kwargs,
show_locals=session.config["show_locals"],
console_options=console.options,
kwargs=kwargs,
remote=remote,
session_filterwarnings=session.config["filterwarnings"],
show_locals=session.config["show_locals"],
task_filterwarnings=get_marks(task, "filterwarnings"),
)
if worker_type == WorkerType.THREADS:
Expand All @@ -211,21 +215,23 @@ def pytask_unconfigure() -> None:
registry.reset()


def _update_python_nodes(
task: PTask, python_nodes: PyTree[PythonNode | None] | None
def _update_carry_over_products(
task: PTask, carry_over_products: PyTree[PythonNode | None] | None
) -> None:
"""Update the python nodes of a task with the python nodes from the future."""
"""Update products carry over from a another process or remote worker."""

def _update_python_node(x: PNode, y: PythonNode | None) -> PNode:
def _update_carry_over_node(x: PNode, y: PythonNode | None) -> PNode:
if y:
x.save(y.load())
return x

structure_python_nodes = tree_structure(python_nodes)
structure_python_nodes = tree_structure(carry_over_products)
structure_produces = tree_structure(task.produces)
# strict must be false when none is leaf.
if structure_produces.is_prefix(structure_python_nodes, strict=False):
task.produces = tree_map(_update_python_node, task.produces, python_nodes) # type: ignore[assignment]
task.produces = tree_map(
_update_carry_over_node, task.produces, carry_over_products
) # type: ignore[assignment]


@define(kw_only=True)
Expand Down
93 changes: 87 additions & 6 deletions src/pytask_parallel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@

import inspect
from functools import partial
from pathlib import PosixPath
from pathlib import WindowsPath
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable

from pytask import NodeLoadError
from pytask import PathNode
from pytask import PNode
from pytask import PProvisionalNode
from pytask.tree_util import PyTree
from pytask.tree_util import tree_map
from pytask.tree_util import tree_map_with_path
from upath.implementations.local import FilePath

if TYPE_CHECKING:
from concurrent.futures import Future
Expand All @@ -22,7 +29,12 @@
from pytask_parallel.wrappers import WrapperResult


__all__ = ["create_kwargs_for_task", "get_module", "parse_future_result"]
__all__ = [
"create_kwargs_for_task",
"get_module",
"parse_future_result",
"is_local_path",
]


def parse_future_result(
Expand All @@ -32,11 +44,12 @@ def parse_future_result(
# An exception was raised before the task was executed.
future_exception = future.exception()
if future_exception is not None:
# Prevent circular import for loky backend.
from pytask_parallel.wrappers import WrapperResult

exc_info = _parse_future_exception(future_exception)
return WrapperResult(
python_nodes=None,
carry_over_products=None,
warning_reports=[],
exc_info=exc_info,
stdout="",
Expand All @@ -45,17 +58,80 @@ def parse_future_result(
return future.result()


def create_kwargs_for_task(task: PTask) -> dict[str, PyTree[Any]]:
def _safe_load(
path: tuple[Any, ...],
node: PNode | PProvisionalNode,
task: PTask,
*,
is_product: bool,
remote: bool,
) -> Any:
"""Load a node and catch exceptions."""
_rich_traceback_guard = True
# Get the argument name like "path" or "return" for function returns.
argument = path[0]

# Raise an error if a PPathNode with a local path is used as a dependency or product
# (except as a return value).
if (
remote
and argument != "return"
and isinstance(node, PathNode)
and is_local_path(node.path)
):
if is_product:
msg = (
f"You cannot use a local path as a product in argument {argument!r} "
"with a remote backend. Either return the content that should be saved "
"in the file with a return annotation "
"(https://tinyurl.com/pytask-return) or use a nonlocal path to store "
"the file in S3 or their like https://tinyurl.com/pytask-remote."
)
raise NodeLoadError(msg)
msg = (
f"You cannot use a local path as a dependency in argument {argument!r} "
"with a remote backend. Upload the file to a remote storage like S3 "
"and use the remote path instead: https://tinyurl.com/pytask-remote."
)
raise NodeLoadError(msg)

try:
return node.load(is_product=is_product)
except Exception as e: # noqa: BLE001
msg = f"Exception while loading node {node.name!r} of task {task.name!r}"
raise NodeLoadError(msg) from e


def create_kwargs_for_task(task: PTask, *, remote: bool) -> dict[str, PyTree[Any]]:
"""Create kwargs for task function."""
parameters = inspect.signature(task.function).parameters

kwargs = {}

for name, value in task.depends_on.items():
kwargs[name] = tree_map(lambda x: x.load(), value)
kwargs[name] = tree_map_with_path(
lambda p, x: _safe_load(
(name, *p), # noqa: B023
x,
task,
is_product=False,
remote=remote,
),
value,
)

for name, value in task.produces.items():
if name in parameters:
kwargs[name] = tree_map(lambda x: x.load(), value)
kwargs[name] = tree_map_with_path(
lambda p, x: _safe_load(
(name, *p), # noqa: B023
x,
task,
is_product=True,
remote=remote,
),
value,
)

return kwargs

Expand Down Expand Up @@ -84,3 +160,8 @@ def get_module(func: Callable[..., Any], path: Path | None) -> ModuleType:
if path:
return inspect.getmodule(func, path.as_posix())
return inspect.getmodule(func)


def is_local_path(path: Path) -> bool:
"""Check if a path is local."""
return isinstance(path, (FilePath, PosixPath, WindowsPath))
Loading