Skip to content

Commit 777ab26

Browse files
authored
Add a registry for the parallel backend. (#90)
1 parent 69b982f commit 777ab26

13 files changed

+624
-373
lines changed

.pre-commit-config.yaml

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,16 @@ repos:
3535
rev: 0.7.1
3636
hooks:
3737
- id: nbstripout
38-
- repo: https://github.com/executablebooks/mdformat
39-
rev: 0.7.17
40-
hooks:
41-
- id: mdformat
42-
additional_dependencies: [
43-
mdformat-gfm,
44-
mdformat-black,
45-
]
46-
args: [--wrap, "88"]
38+
# Conflicts with admonitions.
39+
# - repo: https://github.com/executablebooks/mdformat
40+
# rev: 0.7.17
41+
# hooks:
42+
# - id: mdformat
43+
# additional_dependencies: [
44+
# mdformat-gfm,
45+
# mdformat-black,
46+
# ]
47+
# args: [--wrap, "88"]
4748
- repo: https://github.com/codespell-project/codespell
4849
rev: v2.2.6
4950
hooks:

README.md

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,15 @@ $ pytask --n-workers 2
4747
$ pytask -n auto
4848
```
4949

50-
Using processes to parallelize the execution of tasks is useful for CPU bound tasks such
50+
Using processes to parallelize the execution of tasks is useful for CPU-bound tasks such
5151
as numerical computations. ([Here](https://stackoverflow.com/a/868577/7523785) is an
52-
explanation on what CPU or IO bound means.)
52+
explanation of what CPU- or IO-bound means.)
5353

54-
For IO bound tasks, tasks where the limiting factor are network responses, access to
54+
For IO-bound tasks, tasks where the limiting factor is network latency and access to
5555
files, you can parallelize via threads.
5656

5757
```console
58-
$ pytask --parallel-backend threads
58+
pytask --parallel-backend threads
5959
```
6060

6161
You can also set the options in a `pyproject.toml`.
@@ -68,6 +68,65 @@ n_workers = 1
6868
parallel_backend = "processes" # or loky or threads
6969
```
7070

71+
## Custom Executor
72+
73+
> [!NOTE]
74+
>
75+
> The interface for custom executors is rudimentary right now and there is not a lot of
76+
> support by public functions. Please, give some feedback if you are trying or managed
77+
> to use a custom backend.
78+
>
79+
> Also, please contribute your custom executors if you consider them useful to others.
80+
81+
pytask-parallel allows you to use your parallel backend as long as it follows the
82+
interface defined by
83+
[`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor).
84+
85+
In some cases, adding a new backend can be as easy as registering a builder function
86+
that receives some arguments (currently only `n_workers`) and returns the instantiated
87+
executor.
88+
89+
```python
90+
from concurrent.futures import Executor
91+
from my_project.executor import CustomExecutor
92+
93+
from pytask_parallel import ParallelBackend, registry
94+
95+
96+
def build_custom_executor(n_workers: int) -> Executor:
97+
return CustomExecutor(max_workers=n_workers)
98+
99+
100+
registry.register_parallel_backend(ParallelBackend.CUSTOM, build_custom_executor)
101+
```
102+
103+
Now, build the project requesting your custom backend.
104+
105+
```console
106+
pytask --parallel-backend custom
107+
```
108+
109+
Realistically, it is not the only necessary adjustment for a nice user experience. There
110+
are two other important things. pytask-parallel does not implement them by default since
111+
it seems more tightly coupled to your backend.
112+
113+
1. A wrapper for the executed function that captures warnings, catches exceptions and
114+
saves products of the task (within the child process!).
115+
116+
As an example, see
117+
[`def _execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L91-L155)
118+
that does all that for the processes and loky backend.
119+
120+
1. To apply the wrapper, you need to write a custom hook implementation for
121+
`def pytask_execute_task()`. See
122+
[`def pytask_execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L41-L65)
123+
for an example. Use the
124+
[`hook_module`](https://pytask-dev.readthedocs.io/en/stable/how_to_guides/extending_pytask.html#using-hook-module-and-hook-module)
125+
configuration value to register your implementation.
126+
127+
Another example of an implementation can be found as a
128+
[test](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/tests/test_backends.py#L35-L78).
129+
71130
## Some implementation details
72131

73132
### Parallelization and Debugging
@@ -92,12 +151,12 @@ Consult the [release notes](CHANGES.md) to find out about what is new.
92151
- `pytask-parallel` does not call the `pytask_execute_task_protocol` hook
93152
specification/entry-point because `pytask_execute_task_setup` and
94153
`pytask_execute_task` need to be separated from `pytask_execute_task_teardown`. Thus,
95-
plugins which change this hook specification may not interact well with the
154+
plugins that change this hook specification may not interact well with the
96155
parallelization.
97156

98-
- There are two PRs for CPython which try to re-enable setting custom reducers which
99-
should have been working, but does not. Here are the references.
157+
- Two PRs for CPython try to re-enable setting custom reducers which should have been
158+
working but does not. Here are the references.
100159

101-
> - <https://bugs.python.org/issue28053>
102-
> - <https://github.com/python/cpython/pull/9959>
103-
> - <https://github.com/python/cpython/pull/15058>
160+
- https://bugs.python.org/issue28053
161+
- https://github.com/python/cpython/pull/9959
162+
- https://github.com/python/cpython/pull/15058

environment.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
name: pytask-parallel
22

33
channels:
4-
- conda-forge/label/pytask_rc
54
- conda-forge
65
- nodefaults
76

@@ -21,7 +20,6 @@ dependencies:
2120
- tox
2221
- ipywidgets
2322
- nbmake
24-
- pre-commit
2523
- pytest-cov
2624

2725
- pip:

src/pytask_parallel/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
from __future__ import annotations
44

5+
from pytask_parallel.backends import ParallelBackend
6+
from pytask_parallel.backends import registry
7+
58
try:
69
from ._version import version as __version__
710
except ImportError:
@@ -10,4 +13,4 @@
1013
__version__ = "unknown"
1114

1215

13-
__all__ = ["__version__"]
16+
__all__ = ["ParallelBackend", "__version__", "registry"]

src/pytask_parallel/backends.py

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,29 @@
22

33
from __future__ import annotations
44

5+
from concurrent.futures import Executor
56
from concurrent.futures import Future
67
from concurrent.futures import ProcessPoolExecutor
78
from concurrent.futures import ThreadPoolExecutor
89
from enum import Enum
910
from typing import Any
1011
from typing import Callable
12+
from typing import ClassVar
1113

1214
import cloudpickle
1315
from loky import get_reusable_executor
1416

17+
__all__ = ["ParallelBackend", "ParallelBackendRegistry", "registry"]
1518

16-
def deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any:
19+
20+
def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any:
1721
"""Deserialize and execute a function and keyword arguments."""
1822
deserialized_fn = cloudpickle.loads(fn)
1923
deserialized_kwargs = cloudpickle.loads(kwargs)
2024
return deserialized_fn(**deserialized_kwargs)
2125

2226

23-
class CloudpickleProcessPoolExecutor(ProcessPoolExecutor):
27+
class _CloudpickleProcessPoolExecutor(ProcessPoolExecutor):
2428
"""Patches the standard executor to serialize functions with cloudpickle."""
2529

2630
# The type signature is wrong for version above Py3.7. Fix when 3.7 is deprecated.
@@ -32,7 +36,7 @@ def submit( # type: ignore[override]
3236
) -> Future[Any]:
3337
"""Submit a new task."""
3438
return super().submit(
35-
deserialize_and_run_with_cloudpickle,
39+
_deserialize_and_run_with_cloudpickle,
3640
fn=cloudpickle.dumps(fn),
3741
kwargs=cloudpickle.dumps(kwargs),
3842
)
@@ -41,13 +45,46 @@ def submit( # type: ignore[override]
4145
class ParallelBackend(Enum):
4246
"""Choices for parallel backends."""
4347

48+
CUSTOM = "custom"
49+
LOKY = "loky"
4450
PROCESSES = "processes"
4551
THREADS = "threads"
46-
LOKY = "loky"
4752

4853

49-
PARALLEL_BACKEND_BUILDER = {
50-
ParallelBackend.PROCESSES: lambda: CloudpickleProcessPoolExecutor,
51-
ParallelBackend.THREADS: lambda: ThreadPoolExecutor,
52-
ParallelBackend.LOKY: lambda: get_reusable_executor,
53-
}
54+
class ParallelBackendRegistry:
55+
"""Registry for parallel backends."""
56+
57+
registry: ClassVar[dict[ParallelBackend, Callable[..., Executor]]] = {}
58+
59+
def register_parallel_backend(
60+
self, kind: ParallelBackend, builder: Callable[..., Executor]
61+
) -> None:
62+
"""Register a parallel backend."""
63+
self.registry[kind] = builder
64+
65+
def get_parallel_backend(self, kind: ParallelBackend, n_workers: int) -> Executor:
66+
"""Get a parallel backend."""
67+
__tracebackhide__ = True
68+
try:
69+
return self.registry[kind](n_workers=n_workers)
70+
except KeyError:
71+
msg = f"No registered parallel backend found for kind {kind}."
72+
raise ValueError(msg) from None
73+
except Exception as e: # noqa: BLE001
74+
msg = f"Could not instantiate parallel backend {kind.value}."
75+
raise ValueError(msg) from e
76+
77+
78+
registry = ParallelBackendRegistry()
79+
80+
81+
registry.register_parallel_backend(
82+
ParallelBackend.PROCESSES,
83+
lambda n_workers: _CloudpickleProcessPoolExecutor(max_workers=n_workers),
84+
)
85+
registry.register_parallel_backend(
86+
ParallelBackend.THREADS, lambda n_workers: ThreadPoolExecutor(max_workers=n_workers)
87+
)
88+
registry.register_parallel_backend(
89+
ParallelBackend.LOKY, lambda n_workers: get_reusable_executor(max_workers=n_workers)
90+
)

src/pytask_parallel/config.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from pytask import hookimpl
99

10+
from pytask_parallel import custom
1011
from pytask_parallel import execute
1112
from pytask_parallel import processes
1213
from pytask_parallel import threads
@@ -33,15 +34,22 @@ def pytask_parse_config(config: dict[str, Any]) -> None:
3334
config["delay"] = 0.1
3435

3536

36-
@hookimpl
37+
@hookimpl(trylast=True)
3738
def pytask_post_parse(config: dict[str, Any]) -> None:
3839
"""Register the parallel backend if debugging is not enabled."""
3940
if config["pdb"] or config["trace"] or config["dry_run"]:
4041
config["n_workers"] = 1
4142

42-
if config["n_workers"] > 1:
43+
# Register parallel execute hook.
44+
if config["n_workers"] > 1 or config["parallel_backend"] == ParallelBackend.CUSTOM:
4345
config["pm"].register(execute)
46+
47+
# Register parallel backends.
48+
if config["n_workers"] > 1:
4449
if config["parallel_backend"] == ParallelBackend.THREADS:
4550
config["pm"].register(threads)
4651
else:
4752
config["pm"].register(processes)
53+
54+
if config["parallel_backend"] == ParallelBackend.CUSTOM:
55+
config["pm"].register(custom)

src/pytask_parallel/custom.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""Contains functions for the threads backend."""
2+
3+
from __future__ import annotations
4+
5+
from typing import TYPE_CHECKING
6+
from typing import Any
7+
8+
from pytask import PTask
9+
from pytask import Session
10+
from pytask import hookimpl
11+
12+
from pytask_parallel.utils import create_kwargs_for_task
13+
14+
if TYPE_CHECKING:
15+
from concurrent.futures import Future
16+
17+
18+
@hookimpl
19+
def pytask_execute_task(session: Session, task: PTask) -> Future[Any]:
20+
"""Execute a task.
21+
22+
Since threads have shared memory, it is not necessary to pickle and unpickle the
23+
task.
24+
25+
"""
26+
kwargs = create_kwargs_for_task(task)
27+
return session.config["_parallel_executor"].submit(task.function, **kwargs)

0 commit comments

Comments
 (0)