Skip to content

Sync local paths in remote environments with the same OS. #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ on:

jobs:

build-package:
name: Build & verify package
run-type-checking:

name: Run tests for type-checking
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
fetch-depth: 0

- uses: hynek/build-and-inspect-python-package@v2
id: baipp

outputs:
python-versions: ${{ steps.baipp.outputs.supported_python_classifiers_json_array }}
python-version-file: .python-version
allow-prereleases: true
cache: pip
- run: pip install tox-uv
- run: tox -e typing

run-tests:

Expand Down Expand Up @@ -59,16 +59,16 @@ jobs:
shell: bash -l {0}
run: tox -e test -- tests -m "unit or (not integration and not end_to_end)" --cov=src --cov=tests --cov-report=xml

- name: Upload coverage report for unit tests and doctests.
if: runner.os == 'Linux' && matrix.python-version == '3.10'
shell: bash -l {0}
run: bash <(curl -s https://codecov.io/bash) -F unit -c
- name: Upload unit test coverage reports to Codecov with GitHub Action
uses: codecov/codecov-action@v4
with:
flags: unit

- name: Run end-to-end tests.
shell: bash -l {0}
run: tox -e test -- tests -m end_to_end --cov=src --cov=tests --cov-report=xml

- name: Upload coverage reports of end-to-end tests.
if: runner.os == 'Linux' && matrix.python-version == '3.10'
shell: bash -l {0}
run: bash <(curl -s https://codecov.io/bash) -F end_to_end -c
- name: Upload end_to_end test coverage reports to Codecov with GitHub Action
uses: codecov/codecov-action@v4
with:
flags: end_to_end
41 changes: 12 additions & 29 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ repos:
- id: no-commit-to-branch
args: [--branch, main]
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0 # Use the ref you want to point at
rev: v1.10.0
hooks:
- id: python-check-blanket-noqa
- id: python-check-mock-methods
Expand Down Expand Up @@ -44,39 +44,22 @@ repos:
mdformat-black,
mdformat-pyproject,
]
args: [--wrap, "88"]
files: (docs/.)
# Conflicts with admonitions.
# - repo: https://github.com/executablebooks/mdformat
# rev: 0.7.17
# hooks:
# - id: mdformat
# additional_dependencies: [
# mdformat-gfm,
# mdformat-black,
# ]
# args: [--wrap, "88"]
- repo: https://github.com/executablebooks/mdformat
rev: 0.7.17
hooks:
- id: mdformat
additional_dependencies: [
mdformat-gfm,
mdformat-black,
]
args: [--wrap, "88"]
files: (README\.md)
- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
hooks:
- id: codespell
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.10.0'
hooks:
- id: mypy
args: [
--no-strict-optional,
--ignore-missing-imports,
]
additional_dependencies: [
attrs,
cloudpickle,
loky,
"git+https://github.com/pytask-dev/pytask.git@main",
rich,
types-click,
types-setuptools,
]
pass_filenames: false
- repo: meta
hooks:
- id: check-hooks-apply
Expand Down
2 changes: 2 additions & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
interactions with adaptive scaling. {issue}`98` does handle the resulting issues: no
strong adherence to priorities, no pending status.
- {pull}`100` adds project management with rye.
- {pull}`101` adds syncing for local paths as dependencies or products in remote
environments with the same OS.

## 0.4.1 - 2024-01-12

Expand Down
8 changes: 8 additions & 0 deletions docs/source/coiled.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ configure the hardware and software environment.
```{literalinclude} ../../docs_src/coiled/coiled_functions_task.py
```

By default, {func}`@coiled.function <coiled.function>`
[scales adaptively](https://docs.coiled.io/user_guide/usage/functions/index.html#adaptive-scaling)
to the workload. It means that coiled infers from the number of submitted tasks and
previous runtimes, how many additional remote workers it should deploy to handle the
workload. It provides a convenient mechanism to scale without intervention. Also,
workers launched by {func}`@coiled.function <coiled.function>` will shutdown quicker
than a cluster.

```{seealso}
Serverless functions are more thoroughly explained in
[coiled's guide](https://docs.coiled.io/user_guide/usage/functions/index.html).
Expand Down
10 changes: 5 additions & 5 deletions docs/source/dask.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ You can find more information in the documentation for

## Remote

You can learn how to deploy your tasks to a remote dask cluster in [this
guide](https://docs.dask.org/en/stable/deploying.html). They recommend to use coiled for
deployment to cloud providers.
You can learn how to deploy your tasks to a remote dask cluster in
[this guide](https://docs.dask.org/en/stable/deploying.html). They recommend to use
coiled for deployment to cloud providers.

[coiled](https://www.coiled.io/) is a product built on top of dask that eases the
deployment of your workflow to many cloud providers like AWS, GCP, and Azure.

If you want to run the tasks in your project on a cluster managed by coiled read
{ref}`this guide <coiled-clusters>`.

Otherwise, follow the instructions in [dask's
guide](https://docs.dask.org/en/stable/deploying.html).
Otherwise, follow the instructions in
[dask's guide](https://docs.dask.org/en/stable/deploying.html).
7 changes: 3 additions & 4 deletions docs/source/developers_guide.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
# Developer's Guide

`pytask-parallel` does not call the `pytask_execute_task_protocol` hook
specification/entry-point because `pytask_execute_task_setup` and
`pytask_execute_task` need to be separated from `pytask_execute_task_teardown`. Thus,
plugins that change this hook specification may not interact well with the
parallelization.
specification/entry-point because `pytask_execute_task_setup` and `pytask_execute_task`
need to be separated from `pytask_execute_task_teardown`. Thus, plugins that change this
hook specification may not interact well with the parallelization.

Two PRs for CPython try to re-enable setting custom reducers which should have been
working but does not. Here are the references.
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ quickstart
coiled
dask
custom_executors
remote_backends
developers_guide
changes
On Github <https://github.com/pytask-dev/pytask-parallel>
Expand Down
47 changes: 47 additions & 0 deletions docs/source/remote_backends.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Remote backends

There are a couple of things you need to know when using backends that launch workers
remotely, meaning not on your machine.

## Cross-platform support

Issue: {issue}`102`.

Currently, it is not possible to run tasks in a remote environment that has a different
OS than your local system. The reason is that when pytask sends data to the remote
worker, the data contains path objects, {class}`pathlib.WindowsPath` or
{class}`pathlib.PosixPath`, which cannot be unpickled on a different system.

In general, remote machines are Unix machines which means people running Unix systems
themselves like Linux and MacOS should have no problems.

Windows users on the other hand should use the
[WSL (Windows Subsystem for Linux)](https://learn.microsoft.com/en-us/windows/wsl/about)
to run their projects.

## Local files

Avoid using local files with remote backends and use storages like S3 for dependencies
and products. The reason is that every local file needs to be send to the remote workers
and when your internet connection is slow you will face a hefty penalty on runtime.

## Local paths

In most projects you are using local paths to refer to dependencies and products of your
tasks. This becomes an interesting problem with remote workers since your local files
are not necessarily available in the remote machine.

pytask-parallel does its best to sync files before the execution to the worker, so you
can run your tasks locally and remotely without changing a thing.

In case you create a file on the remote machine, the product will be synced back to your
local machine as well.

It is still necessary to know that the remote paths are temporary files that share the
same file extension as the local file, but the name and path will be different. Do not
rely on them.

Another way to circumvent the problem is to first define a local task that stores all
your necessary files in a remote storage like S3. In the remaining tasks, you can then
use paths pointing to the bucket instead of the local machine. See the
[guide on remote files](https://tinyurl.com/pytask-remote) for more explanations.
17 changes: 5 additions & 12 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,12 @@ content-type = "text/markdown"
text = "MIT"

[project.urls]
Homepage = "https://github.com/pytask-dev/pytask-parallel"
Changelog = "https://github.com/pytask-dev/pytask-parallel/blob/main/CHANGES.md"
Documentation = "https://github.com/pytask-dev/pytask-parallel"
Homepage = "https://pytask-parallel.readthedocs.io/"
Changelog = "https://pytask-parallel.readthedocs.io/en/latest/changes.html"
Documentation = "https://pytask-parallel.readthedocs.io/"
Github = "https://github.com/pytask-dev/pytask-parallel"
Tracker = "https://github.com/pytask-dev/pytask-parallel/issues"

[tool.setuptools]
include-package-data = true
zip-safe = false
platforms = ["any"]
license-files = ["LICENSE"]

[tool.check-manifest]
ignore = ["src/pytask_parallel/_version.py"]

[project.entry-points.pytask]
pytask_parallel = "pytask_parallel.plugin"

Expand Down Expand Up @@ -113,6 +104,7 @@ disallow_untyped_defs = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "tests.*"
Expand All @@ -127,6 +119,7 @@ unsafe-fixes = true
[tool.ruff.lint]
extend-ignore = [
"ANN101", # type annotating self
"ANN102", # type annotating cls
"ANN401", # flake8-annotate typing.Any
"COM812", # Comply with ruff-format.
"ISC001", # Comply with ruff-format.
Expand Down
3 changes: 2 additions & 1 deletion src/pytask_parallel/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ def submit( # type: ignore[override]

def _get_dask_executor(n_workers: int) -> Executor:
"""Get an executor from a dask client."""
_rich_traceback_omit = True
_rich_traceback_guard = True
from pytask import import_optional_dependency

distributed = import_optional_dependency("distributed")
assert distributed # noqa: S101
try:
client = distributed.Client.current()
except ValueError:
Expand Down
2 changes: 1 addition & 1 deletion src/pytask_parallel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def pytask_parse_config(config: dict[str, Any]) -> None:
raise ValueError(msg) from None

if config["n_workers"] == "auto":
config["n_workers"] = max(os.cpu_count() - 1, 1)
config["n_workers"] = max(os.cpu_count() - 1, 1) # type: ignore[operator]

# If more than one worker is used, and no backend is set, use loky.
if config["n_workers"] > 1 and config["parallel_backend"] == ParallelBackend.NONE:
Expand Down
34 changes: 25 additions & 9 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any

import cloudpickle
from _pytask.node_protocols import PPathNode
from attrs import define
from attrs import field
from pytask import ExecutionReport
Expand All @@ -24,9 +25,10 @@

from pytask_parallel.backends import WorkerType
from pytask_parallel.backends import registry
from pytask_parallel.typing import CarryOverPath
from pytask_parallel.typing import is_coiled_function
from pytask_parallel.utils import create_kwargs_for_task
from pytask_parallel.utils import get_module
from pytask_parallel.utils import is_coiled_function
from pytask_parallel.utils import parse_future_result

if TYPE_CHECKING:
Expand Down Expand Up @@ -222,7 +224,7 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]:
from pytask_parallel.wrappers import wrap_task_in_thread

return session.config["_parallel_executor"].submit(
wrap_task_in_thread, task=task, **kwargs
wrap_task_in_thread, task=task, remote=False, **kwargs
)
msg = f"Unknown worker type {worker_type}"
raise ValueError(msg)
Expand All @@ -235,19 +237,33 @@ def pytask_unconfigure() -> None:


def _update_carry_over_products(
task: PTask, carry_over_products: PyTree[PythonNode | None] | None
task: PTask, carry_over_products: PyTree[CarryOverPath | PythonNode | None] | None
) -> None:
"""Update products carry over from a another process or remote worker."""
"""Update products carry over from a another process or remote worker.

def _update_carry_over_node(x: PNode, y: PythonNode | None) -> PNode:
if y:
The python node can be a regular one passing the value to another python node.

In other instances the python holds a string or bytes from a RemotePathNode.

"""

def _update_carry_over_node(
x: PNode, y: CarryOverPath | PythonNode | None
) -> PNode:
if y is None:
return x
if isinstance(x, PPathNode) and isinstance(y, CarryOverPath):
x.path.write_bytes(y.content)
return x
if isinstance(y, PythonNode):
x.save(y.load())
return x
return x
raise NotImplementedError

structure_python_nodes = tree_structure(carry_over_products)
structure_carry_over_products = tree_structure(carry_over_products)
structure_produces = tree_structure(task.produces)
# strict must be false when none is leaf.
if structure_produces.is_prefix(structure_python_nodes, strict=False):
if structure_produces.is_prefix(structure_carry_over_products, strict=False):
task.produces = tree_map(
_update_carry_over_node, task.produces, carry_over_products
) # type: ignore[assignment]
Expand Down
Loading