Skip to content

Enable @coiled.function. #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions docs/source/coiled.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# coiled

```{caution}
Currently, the coiled backend can only be used if your workflow code is organized in a
package due to how pytask imports your code and dask serializes task functions
([issue](https://github.com/dask/distributed/issues/8607)).
```

[coiled](https://www.coiled.io/) is a product built on top of dask that eases the
deployment of your workflow to many cloud providers like AWS, GCP, and Azure.

Note that, coiled is a paid service. They offer a
[free monthly tier](https://www.coiled.io/pricing) where you only need to pay the costs
for your cloud provider and you can get started without a credit card.

They provide the following benefits which are especially helpful to people who are not
familiar with cloud providers or remote computing.

- coiled manages your resources by spawning workers if you need them and shutting them
down if they are idle.
- [Synchronization](https://docs.coiled.io/user_guide/software/sync.html) of your local
environment to remote workers.
- [Adaptive scaling](https://docs.dask.org/en/latest/adaptive.html) if your workflow
takes a long time to finish.

There are two ways how you can use coiled with pytask and pytask-parallel.

1. Run individual tasks in the cloud.
1. Run your whole workflow in the cloud.

Both approaches are explained below after the setup.

## Setup

Follow coiled's
[four step short process](https://docs.coiled.io/user_guide/setup/index.html) to set up
your local environment and configure your cloud provider.

## Running individual tasks

In most projects there are a just couple of tasks that require a lot of resources and
that you would like to run in a virtual machine in the cloud.

With coiled's
[serverless functions](https://docs.coiled.io/user_guide/usage/functions/index.html),
you can define the hardware and software environment for your task. Just decorate the
task function with a {func}`@coiled.function <coiled.function>` decorator.

```{literalinclude} ../../docs_src/coiled/coiled_functions.py
```

To execute the workflow, you need to turn on parallelization by requesting two or more
workers or specifying one of the parallel backends. Otherwise, the decorated task is run
locally.

```console
pytask -n 2
pytask --parallel-backend loky
```

When you apply the {func}`@task <pytask.task>` decorator to the task, make sure the
{func}`@coiled.function <coiled.function>` decorator is applied first, or is closer to
the function. Otherwise, it will be ignored. Add more arguments to the decorator to
configure the hardware and software environment.

```{literalinclude} ../../docs_src/coiled/coiled_functions_task.py
```

```{seealso}
Serverless functions are more thoroughly explained in
[coiled's guide](https://docs.coiled.io/user_guide/usage/functions/index.html).
```

(coiled-clusters)=

## Running a cluster

It is also possible to launch a cluster and run each task in a worker provided by
coiled. Usually, it is not necessary and you are better off using coiled's serverless
functions.

If you want to launch a cluster managed by coiled, register a function that builds an
executor using {class}`coiled.Cluster`.

```python
import coiled
from pytask_parallel import ParallelBackend
from pytask_parallel import registry
from concurrent.futures import Executor


def _build_coiled_executor(n_workers: int) -> Executor:
return coiled.Cluster(n_workers=n_workers).get_client().get_executor()


registry.register_parallel_backend(ParallelBackend.CUSTOM, _build_coiled_executor)
```

Then, execute your workflow with

```console
pytask --parallel-backend custom
```
1 change: 1 addition & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
"coiled": ("https://docs.coiled.io/", None),
"dask": ("https://docs.dask.org/en/stable/", None),
"distributed": ("https://distributed.dask.org/en/stable/", None),
"pytask": ("https://pytask-dev.readthedocs.io/en/stable/", None),
"python": ("https://docs.python.org/3.10", None),
}

Expand Down
40 changes: 8 additions & 32 deletions docs/source/custom_executors.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,19 @@ In some cases, adding a new backend can be as easy as registering a builder func
that receives some arguments (currently only `n_workers`) and returns the instantiated
executor.

```python
from concurrent.futures import Executor
from my_project.executor import CustomExecutor

from pytask_parallel import ParallelBackend, registry


def build_custom_executor(n_workers: int) -> Executor:
return CustomExecutor(max_workers=n_workers)


registry.register_parallel_backend(ParallelBackend.CUSTOM, build_custom_executor)
```{literalinclude} ../../docs_src/custom_executors.py
```

Given {class}`pytask_parallel.WorkerType` pytask applies automatic wrappers around the
task function to collect tracebacks, capture stdout/stderr and their like. The `remote`
keyword allows pytask to handle local paths automatically for remote clusters.

Now, build the project requesting your custom backend.

```console
pytask --parallel-backend custom
```

Realistically, it is not the only necessary adjustment for a nice user experience. There
are two other important things. pytask-parallel does not implement them by default since
it seems more tightly coupled to your backend.

1. A wrapper for the executed function that captures warnings, catches exceptions and
saves products of the task (within the child process!).

As an example, see
[`def _execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L91-L155)
that does all that for the processes and loky backend.

1. To apply the wrapper, you need to write a custom hook implementation for
`def pytask_execute_task()`. See
[`def pytask_execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L41-L65)
for an example. Use the
[`hook_module`](https://pytask-dev.readthedocs.io/en/stable/how_to_guides/extending_pytask.html#using-hook-module-and-hook-module)
configuration value to register your implementation.

Another example of an implementation can be found as a
[test](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/tests/test_backends.py#L35-L78).
```{important}
pytask applies automatic wrappers
```
49 changes: 9 additions & 40 deletions docs/source/dask.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,48 +81,17 @@ You can find more information in the documentation for
[`dask.distributed`](https://distributed.dask.org/en/stable/).
```

## Remote - Using cloud providers with coiled
## Remote

You can learn how to deploy your tasks to a remote dask cluster in [this
guide](https://docs.dask.org/en/stable/deploying.html). They recommend to use coiled for
deployment to cloud providers.

[coiled](https://www.coiled.io/) is a product built on top of dask that eases the
deployment of your workflow to many cloud providers like AWS, GCP, and Azure.

They offer a [free monthly tier](https://www.coiled.io/pricing) where you only
need to pay the costs for your cloud provider and you can get started without a credit
card.

Furthermore, they offer the following benefits which are especially helpful to people
who are not familiar with cloud providers or remote computing.

- A [four step short process](https://docs.coiled.io/user_guide/setup/index.html) to set
up your local environment and configure your cloud provider.
- coiled manages your resources by spawning workers if you need them and shutting them
down if they are idle.
- Synchronization of your local environment to remote workers.

So, how can you run your pytask workflow on a cloud infrastructure with coiled?

1. Follow their [guide on getting
started](https://docs.coiled.io/user_guide/setup/index.html) by creating a coiled
account and syncing it with your cloud provider.

1. Register a function that builds an executor using {class}`coiled.Cluster`.

```python
import coiled
from pytask_parallel import ParallelBackend
from pytask_parallel import registry
from concurrent.futures import Executor


def _build_coiled_executor(n_workers: int) -> Executor:
return coiled.Cluster(n_workers=n_workers).get_client().get_executor()


registry.register_parallel_backend(ParallelBackend.CUSTOM, _build_coiled_executor)
```

1. Execute your workflow with
If you want to run the tasks in your project on a cluster managed by coiled read
{ref}`this guide <coiled-clusters>`.

```console
pytask --parallel-backend custom
```
Otherwise, follow the instructions in [dask's
guide](https://docs.dask.org/en/stable/deploying.html).
1 change: 1 addition & 0 deletions docs/source/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pytask-parallel allows to execute workflows defined with
maxdepth: 1
---
quickstart
coiled
dask
custom_executors
developers_guide
Expand Down
36 changes: 23 additions & 13 deletions docs/source/quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ $ conda install -c conda-forge pytask-parallel
When the plugin is only installed and pytask executed, the tasks are not run in
parallel.

For parallelization with the default backend [loky](https://loky.readthedocs.io/), you need to launch multiple workers.
For parallelization with the default backend [loky](https://loky.readthedocs.io/), you
need to launch multiple workers or specify the parallel backend explicitly. Here, is how
you launch multiple workers.

`````{tab-set}
````{tab-item} CLI
Expand All @@ -45,8 +47,8 @@ n_workers = "auto"
````
`````

To use a different backend, pass the `--parallel-backend` option. The following command
will execute the workflow with one worker and the loky backend.
To specify the parallel backend, pass the `--parallel-backend` option. The following
command will execute the workflow with one worker and the loky backend.

`````{tab-set}
````{tab-item} CLI
Expand All @@ -72,23 +74,32 @@ parallel_backend = "loky"
It is not possible to combine parallelization with debugging. That is why `--pdb` or
`--trace` deactivate parallelization.

If you parallelize the execution of your tasks using two or more workers, do not use
`breakpoint()` or `import pdb; pdb.set_trace()` since both will cause exceptions.
If you parallelize the execution of your tasks, do not use `breakpoint()` or
`import pdb; pdb.set_trace()` since both will cause exceptions.
```

### loky

There are multiple backends available. The default is the backend provided by loky which
aims to be a more robust implementation of {class}`~multiprocessing.pool.Pool` and in
is a more robust implementation of {class}`~multiprocessing.pool.Pool` and in
{class}`~concurrent.futures.ProcessPoolExecutor`.

```console
pytask --parallel-backend loky
```

As it spawns workers in new processes to run the tasks, it is especially suited for
CPU-bound tasks. ([Here](https://stackoverflow.com/a/868577/7523785) is an
explanation of what CPU- or IO-bound means.)
A parallel backend with processes is especially suited for CPU-bound tasks as it spawns
workers in new processes to run the tasks.
([Here](https://stackoverflow.com/a/868577/7523785) is an explanation of what CPU- or
IO-bound means.)

### coiled

pytask-parallel integrates with coiled allowing to run tasks in virtual machines of AWS,
Azure and GCP. You can decide whether to run only some selected tasks or the whole
project in the cloud.

Read more about coiled in [this guide](coiled.md).

### `concurrent.futures`

Expand Down Expand Up @@ -141,11 +152,10 @@ Capturing warnings is not thread-safe. Therefore, warnings cannot be captured re
when tasks are parallelized with `--parallel-backend threads`.
```

### dask + coiled
### dask

dask and coiled together provide the option to execute your workflow on cloud providers
like AWS, GCP or Azure. Check out the [dedicated guide](dask.md) if you are interested
in that.
dask allows to run your workflows on many different kinds of clusters like cloud
clusters and traditional HPC.

Using the default mode, dask will spawn multiple local workers to process the tasks.

Expand Down
6 changes: 6 additions & 0 deletions docs_src/coiled/coiled_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import coiled


@coiled.function()
def task_example() -> None:
pass
12 changes: 12 additions & 0 deletions docs_src/coiled/coiled_functions_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import coiled
from pytask import task


@task
@coiled.function(
region="eu-central-1", # Run the task close to you.
memory="512 GB", # Use a lot of memory.
cpu=128, # Use a lot of CPU.
vm_type="p3.2xlarge", # Run a GPU instance.
)
def task_example() -> None: ...
15 changes: 15 additions & 0 deletions docs_src/custom_executors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from concurrent.futures import Executor

from my_project.executor import CustomExecutor
from pytask_parallel import ParallelBackend
from pytask_parallel import WorkerType
from pytask_parallel import registry


def build_custom_executor(n_workers: int) -> Executor:
return CustomExecutor(
max_workers=n_workers, worker_type=WorkerType.PROCESSES, remote=False
)


registry.register_parallel_backend(ParallelBackend.CUSTOM, build_custom_executor)
3 changes: 2 additions & 1 deletion src/pytask_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

from pytask_parallel.backends import ParallelBackend
from pytask_parallel.backends import WorkerType
from pytask_parallel.backends import registry

try:
Expand All @@ -13,4 +14,4 @@
__version__ = "unknown"


__all__ = ["ParallelBackend", "__version__", "registry"]
__all__ = ["ParallelBackend", "__version__", "registry", "WorkerType"]
2 changes: 1 addition & 1 deletion src/pytask_parallel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ def pytask_post_parse(config: dict[str, Any]) -> None:
return

# Register parallel execute and logging hook.
config["pm"].register(logging)
config["pm"].register(execute)
config["pm"].register(logging)
Loading