Skip to content

Add a registry for the parallel backend. #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ repos:
rev: 0.7.1
hooks:
- id: nbstripout
- repo: https://github.com/executablebooks/mdformat
rev: 0.7.17
hooks:
- id: mdformat
additional_dependencies: [
mdformat-gfm,
mdformat-black,
]
args: [--wrap, "88"]
# Conflicts with admonitions.
# - repo: https://github.com/executablebooks/mdformat
# rev: 0.7.17
# hooks:
# - id: mdformat
# additional_dependencies: [
# mdformat-gfm,
# mdformat-black,
# ]
# args: [--wrap, "88"]
- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
hooks:
Expand Down
79 changes: 69 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ $ pytask --n-workers 2
$ pytask -n auto
```

Using processes to parallelize the execution of tasks is useful for CPU bound tasks such
Using processes to parallelize the execution of tasks is useful for CPU-bound tasks such
as numerical computations. ([Here](https://stackoverflow.com/a/868577/7523785) is an
explanation on what CPU or IO bound means.)
explanation of what CPU- or IO-bound means.)

For IO bound tasks, tasks where the limiting factor are network responses, access to
For IO-bound tasks, tasks where the limiting factor is network latency and access to
files, you can parallelize via threads.

```console
$ pytask --parallel-backend threads
pytask --parallel-backend threads
```

You can also set the options in a `pyproject.toml`.
Expand All @@ -68,6 +68,65 @@ n_workers = 1
parallel_backend = "processes" # or loky or threads
```

## Custom Executor

> [!NOTE]
>
> The interface for custom executors is rudimentary right now and there is not a lot of
> support by public functions. Please, give some feedback if you are trying or managed
> to use a custom backend.
>
> Also, please contribute your custom executors if you consider them useful to others.

pytask-parallel allows you to use your parallel backend as long as it follows the
interface defined by
[`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor).

In some cases, adding a new backend can be as easy as registering a builder function
that receives some arguments (currently only `n_workers`) and returns the instantiated
executor.

```python
from concurrent.futures import Executor
from my_project.executor import CustomExecutor

from pytask_parallel import ParallelBackend, registry


def build_custom_executor(n_workers: int) -> Executor:
return CustomExecutor(max_workers=n_workers)


registry.register_parallel_backend(ParallelBackend.CUSTOM, build_custom_executor)
```

Now, build the project requesting your custom backend.

```console
pytask --parallel-backend custom
```

Realistically, it is not the only necessary adjustment for a nice user experience. There
are two other important things. pytask-parallel does not implement them by default since
it seems more tightly coupled to your backend.

1. A wrapper for the executed function that captures warnings, catches exceptions and
saves products of the task (within the child process!).

As an example, see
[`def _execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L91-L155)
that does all that for the processes and loky backend.

1. To apply the wrapper, you need to write a custom hook implementation for
`def pytask_execute_task()`. See
[`def pytask_execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L41-L65)
for an example. Use the
[`hook_module`](https://pytask-dev.readthedocs.io/en/stable/how_to_guides/extending_pytask.html#using-hook-module-and-hook-module)
configuration value to register your implementation.

Another example of an implementation can be found as a
[test](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/tests/test_backends.py#L35-L78).

## Some implementation details

### Parallelization and Debugging
Expand All @@ -92,12 +151,12 @@ Consult the [release notes](CHANGES.md) to find out about what is new.
- `pytask-parallel` does not call the `pytask_execute_task_protocol` hook
specification/entry-point because `pytask_execute_task_setup` and
`pytask_execute_task` need to be separated from `pytask_execute_task_teardown`. Thus,
plugins which change this hook specification may not interact well with the
plugins that change this hook specification may not interact well with the
parallelization.

- There are two PRs for CPython which try to re-enable setting custom reducers which
should have been working, but does not. Here are the references.
- Two PRs for CPython try to re-enable setting custom reducers which should have been
working but does not. Here are the references.

> - <https://bugs.python.org/issue28053>
> - <https://github.com/python/cpython/pull/9959>
> - <https://github.com/python/cpython/pull/15058>
- https://bugs.python.org/issue28053
- https://github.com/python/cpython/pull/9959
- https://github.com/python/cpython/pull/15058
2 changes: 0 additions & 2 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: pytask-parallel

channels:
- conda-forge/label/pytask_rc
- conda-forge
- nodefaults

Expand All @@ -21,7 +20,6 @@ dependencies:
- tox
- ipywidgets
- nbmake
- pre-commit
- pytest-cov

- pip:
Expand Down
5 changes: 4 additions & 1 deletion src/pytask_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from __future__ import annotations

from pytask_parallel.backends import ParallelBackend
from pytask_parallel.backends import registry

try:
from ._version import version as __version__
except ImportError:
Expand All @@ -10,4 +13,4 @@
__version__ = "unknown"


__all__ = ["__version__"]
__all__ = ["ParallelBackend", "__version__", "registry"]
55 changes: 46 additions & 9 deletions src/pytask_parallel/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,29 @@

from __future__ import annotations

from concurrent.futures import Executor
from concurrent.futures import Future
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from typing import Any
from typing import Callable
from typing import ClassVar

import cloudpickle
from loky import get_reusable_executor

__all__ = ["ParallelBackend", "ParallelBackendRegistry", "registry"]

def deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any:

def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any:
"""Deserialize and execute a function and keyword arguments."""
deserialized_fn = cloudpickle.loads(fn)
deserialized_kwargs = cloudpickle.loads(kwargs)
return deserialized_fn(**deserialized_kwargs)


class CloudpickleProcessPoolExecutor(ProcessPoolExecutor):
class _CloudpickleProcessPoolExecutor(ProcessPoolExecutor):
"""Patches the standard executor to serialize functions with cloudpickle."""

# The type signature is wrong for version above Py3.7. Fix when 3.7 is deprecated.
Expand All @@ -32,7 +36,7 @@ def submit( # type: ignore[override]
) -> Future[Any]:
"""Submit a new task."""
return super().submit(
deserialize_and_run_with_cloudpickle,
_deserialize_and_run_with_cloudpickle,
fn=cloudpickle.dumps(fn),
kwargs=cloudpickle.dumps(kwargs),
)
Expand All @@ -41,13 +45,46 @@ def submit( # type: ignore[override]
class ParallelBackend(Enum):
"""Choices for parallel backends."""

CUSTOM = "custom"
LOKY = "loky"
PROCESSES = "processes"
THREADS = "threads"
LOKY = "loky"


PARALLEL_BACKEND_BUILDER = {
ParallelBackend.PROCESSES: lambda: CloudpickleProcessPoolExecutor,
ParallelBackend.THREADS: lambda: ThreadPoolExecutor,
ParallelBackend.LOKY: lambda: get_reusable_executor,
}
class ParallelBackendRegistry:
"""Registry for parallel backends."""

registry: ClassVar[dict[ParallelBackend, Callable[..., Executor]]] = {}

def register_parallel_backend(
self, kind: ParallelBackend, builder: Callable[..., Executor]
) -> None:
"""Register a parallel backend."""
self.registry[kind] = builder

def get_parallel_backend(self, kind: ParallelBackend, n_workers: int) -> Executor:
"""Get a parallel backend."""
__tracebackhide__ = True
try:
return self.registry[kind](n_workers=n_workers)
except KeyError:
msg = f"No registered parallel backend found for kind {kind}."
raise ValueError(msg) from None
except Exception as e: # noqa: BLE001
msg = f"Could not instantiate parallel backend {kind.value}."
raise ValueError(msg) from e


registry = ParallelBackendRegistry()


registry.register_parallel_backend(
ParallelBackend.PROCESSES,
lambda n_workers: _CloudpickleProcessPoolExecutor(max_workers=n_workers),
)
registry.register_parallel_backend(
ParallelBackend.THREADS, lambda n_workers: ThreadPoolExecutor(max_workers=n_workers)
)
registry.register_parallel_backend(
ParallelBackend.LOKY, lambda n_workers: get_reusable_executor(max_workers=n_workers)
)
12 changes: 10 additions & 2 deletions src/pytask_parallel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from pytask import hookimpl

from pytask_parallel import custom
from pytask_parallel import execute
from pytask_parallel import processes
from pytask_parallel import threads
Expand All @@ -33,15 +34,22 @@ def pytask_parse_config(config: dict[str, Any]) -> None:
config["delay"] = 0.1


@hookimpl
@hookimpl(trylast=True)
def pytask_post_parse(config: dict[str, Any]) -> None:
"""Register the parallel backend if debugging is not enabled."""
if config["pdb"] or config["trace"] or config["dry_run"]:
config["n_workers"] = 1

if config["n_workers"] > 1:
# Register parallel execute hook.
if config["n_workers"] > 1 or config["parallel_backend"] == ParallelBackend.CUSTOM:
config["pm"].register(execute)

# Register parallel backends.
if config["n_workers"] > 1:
if config["parallel_backend"] == ParallelBackend.THREADS:
config["pm"].register(threads)
else:
config["pm"].register(processes)

if config["parallel_backend"] == ParallelBackend.CUSTOM:
config["pm"].register(custom)
27 changes: 27 additions & 0 deletions src/pytask_parallel/custom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Contains functions for the threads backend."""

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Any

from pytask import PTask
from pytask import Session
from pytask import hookimpl

from pytask_parallel.utils import create_kwargs_for_task

if TYPE_CHECKING:
from concurrent.futures import Future


@hookimpl
def pytask_execute_task(session: Session, task: PTask) -> Future[Any]:
"""Execute a task.

Since threads have shared memory, it is not necessary to pickle and unpickle the
task.

"""
kwargs = create_kwargs_for_task(task)
return session.config["_parallel_executor"].submit(task.function, **kwargs)
Loading