From 7014d920b7ef77c4c2d6b4ca2bfcded3a2fdcea4 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 13 Apr 2024 00:57:46 +0200 Subject: [PATCH 1/5] Fix. --- .pre-commit-config.yaml | 2 +- docs/source/coiled.md | 94 ++++++++++++++++++++++++ docs/source/dask.md | 42 +---------- docs/source/index.md | 1 + docs_src/coiled/coiled_functions.py | 6 ++ docs_src/coiled/coiled_functions_task.py | 7 ++ pyproject.toml | 63 +++++++++------- src/pytask_parallel/config.py | 2 +- src/pytask_parallel/execute.py | 25 +++++++ src/pytask_parallel/utils.py | 12 +++ src/pytask_parallel/wrappers.py | 10 ++- 11 files changed, 196 insertions(+), 68 deletions(-) create mode 100644 docs/source/coiled.md create mode 100644 docs_src/coiled/coiled_functions.py create mode 100644 docs_src/coiled/coiled_functions_task.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 986cb85..acca674 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -82,7 +82,7 @@ repos: hooks: - id: check-manifest args: [--no-build-isolation] - additional_dependencies: [setuptools-scm, toml, wheel] + additional_dependencies: [hatchling, hatch-vcs] - repo: meta hooks: - id: check-hooks-apply diff --git a/docs/source/coiled.md b/docs/source/coiled.md new file mode 100644 index 0000000..40404ed --- /dev/null +++ b/docs/source/coiled.md @@ -0,0 +1,94 @@ +# coiled + +[coiled](https://www.coiled.io/) is a product built on top of dask that eases the +deployment of your workflow to many cloud providers like AWS, GCP, and Azure. + +Note that, coiled is a paid service. They offer a +[free monthly tier](https://www.coiled.io/pricing) where you only need to pay the costs +for your cloud provider and you can get started without a credit card. + +They provide the following benefits which are especially helpful to people who are not +familiar with cloud providers or remote computing. + +- coiled manages your resources by spawning workers if you need them and shutting them + down if they are idle. +- Synchronization of your local environment to remote workers. + +There are two ways how you can use coiled with pytask and pytask-parallel. + +1. Run individual tasks in the cloud. +1. Run your whole workflow in the cloud. + +Both approaches are explained below after the setup. + +## Setup + +Follow coiled's +[four step short process](https://docs.coiled.io/user_guide/setup/index.html) to set up +your local environment and configure your cloud provider. + +## Running individual tasks + +In most projects there are a just couple of tasks that require a lot of resources and +that you would like to run in a virtual machine in the cloud. + +With coiled's +[serverless functions](https://docs.coiled.io/user_guide/usage/functions/index.html), +you can define the hardware and software environment for your task. Just decorate the +task function with a {func}`@coiled.function ` decorator. + +```{literalinclude} ../../docs_src/coiled/coiled_functions.py +``` + +To execute the workflow, you need to turn on parallelization by requesting two or more +workers or specifying one of the parallel backends. Otherwise, the decorated task is run +locally. + +```console +pytask -n 2 +pytask --parallel-backend loky +``` + +When you apply the {func}`@task ` decorator to the task, make sure the +`@coiled.function` decorator is applied first, or is closer to the function. Otherwise, +it will be ignored. + +```{literalinclude} ../../docs_src/coiled/coiled_functions_task.py +``` + +```{seealso} +Serverless functions are more thoroughly explained in +[coiled's guide](https://docs.coiled.io/user_guide/usage/functions/index.html). +``` + +(coiled-clusters)= + +## Running a cluster + +So, how can you run your pytask workflow on a cloud infrastructure with coiled? + +1. Follow their [guide on getting + started](https://docs.coiled.io/user_guide/setup/index.html) by creating a coiled + account and syncing it with your cloud provider. + +1. Register a function that builds an executor using {class}`coiled.Cluster`. + + ```python + import coiled + from pytask_parallel import ParallelBackend + from pytask_parallel import registry + from concurrent.futures import Executor + + + def _build_coiled_executor(n_workers: int) -> Executor: + return coiled.Cluster(n_workers=n_workers).get_client().get_executor() + + + registry.register_parallel_backend(ParallelBackend.CUSTOM, _build_coiled_executor) + ``` + +1. Execute your workflow with + + ```console + pytask --parallel-backend custom + ``` diff --git a/docs/source/dask.md b/docs/source/dask.md index e0b2ac1..26f0a98 100644 --- a/docs/source/dask.md +++ b/docs/source/dask.md @@ -86,43 +86,5 @@ You can find more information in the documentation for [coiled](https://www.coiled.io/) is a product built on top of dask that eases the deployment of your workflow to many cloud providers like AWS, GCP, and Azure. -They offer a [free monthly tier](https://www.coiled.io/pricing) where you only -need to pay the costs for your cloud provider and you can get started without a credit -card. - -Furthermore, they offer the following benefits which are especially helpful to people -who are not familiar with cloud providers or remote computing. - -- A [four step short process](https://docs.coiled.io/user_guide/setup/index.html) to set - up your local environment and configure your cloud provider. -- coiled manages your resources by spawning workers if you need them and shutting them - down if they are idle. -- Synchronization of your local environment to remote workers. - -So, how can you run your pytask workflow on a cloud infrastructure with coiled? - -1. Follow their [guide on getting - started](https://docs.coiled.io/user_guide/setup/index.html) by creating a coiled - account and syncing it with your cloud provider. - -1. Register a function that builds an executor using {class}`coiled.Cluster`. - - ```python - import coiled - from pytask_parallel import ParallelBackend - from pytask_parallel import registry - from concurrent.futures import Executor - - - def _build_coiled_executor(n_workers: int) -> Executor: - return coiled.Cluster(n_workers=n_workers).get_client().get_executor() - - - registry.register_parallel_backend(ParallelBackend.CUSTOM, _build_coiled_executor) - ``` - -1. Execute your workflow with - - ```console - pytask --parallel-backend custom - ``` +If you want to run the tasks in your project on a cluster managed by coiled read +{ref}`this guide `. diff --git a/docs/source/index.md b/docs/source/index.md index 6209512..d362a0f 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -23,6 +23,7 @@ pytask-parallel allows to execute workflows defined with maxdepth: 1 --- quickstart +coiled dask custom_executors developers_guide diff --git a/docs_src/coiled/coiled_functions.py b/docs_src/coiled/coiled_functions.py new file mode 100644 index 0000000..ca1a375 --- /dev/null +++ b/docs_src/coiled/coiled_functions.py @@ -0,0 +1,6 @@ +import coiled + + +@coiled.function() +def task_example() -> None: + pass diff --git a/docs_src/coiled/coiled_functions_task.py b/docs_src/coiled/coiled_functions_task.py new file mode 100644 index 0000000..cc4096a --- /dev/null +++ b/docs_src/coiled/coiled_functions_task.py @@ -0,0 +1,7 @@ +import coiled +from pytask import task + + +@task +@coiled.function() +def task_example() -> None: ... diff --git a/pyproject.toml b/pyproject.toml index d28f4ea..121ceb3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,3 @@ -[build-system] -build-backend = "setuptools.build_meta" -requires = ["setuptools>=64", "setuptools_scm[toml]>=8"] - [project] name = "pytask_parallel" description = "Parallelize the execution of tasks with pytask." @@ -20,7 +16,8 @@ dependencies = [ "loky", "pluggy>=1.0.0", "pytask>=0.4.5", - "rich" + "rich", + "project @ file:///home/tobia/git/pytask-workspace/pytask-parallel/example", ] dynamic = ["version"] @@ -29,19 +26,20 @@ name = "Tobias Raabe" email = "raabe@posteo.de" [project.optional-dependencies] +coiled = ["coiled>=0.9.4"] dask = ["dask[complete]", "distributed"] docs = [ - "furo", - "ipython", - "matplotlib", - "myst-parser", - "nbsphinx", - "sphinx", - "sphinx-click", - "sphinx-copybutton", - "sphinx-design>=0.3", - "sphinx-toolbox", - "sphinxext-opengraph", + "furo", + "ipython", + "matplotlib", + "myst-parser", + "nbsphinx", + "sphinx", + "sphinx-click", + "sphinx-copybutton", + "sphinx-design>=0.3", + "sphinx-toolbox", + "sphinxext-opengraph", ] test = [ "pytask-parallel[all]", @@ -49,6 +47,9 @@ test = [ "pytest", "pytest-cov", ] +aws = [ + "s3fs>=2024.3.1" +] [project.readme] file = "README.md" @@ -76,15 +77,28 @@ ignore = ["src/pytask_parallel/_version.py"] [project.entry-points.pytask] pytask_parallel = "pytask_parallel.plugin" -[tool.setuptools.package-dir] -"" = "src" +[build-system] +requires = ["hatchling", "hatch_vcs"] +build-backend = "hatchling.build" + +[tool.rye] +managed = true + +[tool.rye.scripts] +clean-docs = { cmd = "rm -rf docs/build" } +build-docs = { cmd = "sphinx-build -b html docs/source docs/build" } + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.hatch.build.hooks.vcs] +version-file = "src/pytask_parallel/_version.py" -[tool.setuptools.packages.find] -where = ["src"] -namespaces = false +[tool.hatch.build.targets.wheel] +packages = ["src/pytask_parallel"] -[tool.setuptools_scm] -version_file = "src/pytask_parallel/_version.py" +[tool.hatch.version] +source = "vcs" [tool.mypy] files = ["src", "tests"] @@ -108,9 +122,7 @@ unsafe-fixes = true [tool.ruff.lint] extend-ignore = [ - # Others. "ANN101", # type annotating self - "ANN102", # type annotating cls "ANN401", # flake8-annotate typing.Any "COM812", # Comply with ruff-format. "ISC001", # Comply with ruff-format. @@ -120,6 +132,7 @@ select = ["ALL"] [tool.ruff.lint.per-file-ignores] "tests/*" = ["D", "ANN", "PLR2004", "S101"] "docs/source/conf.py" = ["INP001"] +"docs_src/*" = ["ARG001", "D", "INP001", "S301"] [tool.ruff.lint.isort] force-single-line = true diff --git a/src/pytask_parallel/config.py b/src/pytask_parallel/config.py index 618cc38..0b6daf6 100644 --- a/src/pytask_parallel/config.py +++ b/src/pytask_parallel/config.py @@ -47,5 +47,5 @@ def pytask_post_parse(config: dict[str, Any]) -> None: return # Register parallel execute and logging hook. - config["pm"].register(logging) config["pm"].register(execute) + config["pm"].register(logging) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index b502755..f263061 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -26,6 +26,7 @@ from pytask_parallel.backends import registry from pytask_parallel.utils import create_kwargs_for_task from pytask_parallel.utils import get_module +from pytask_parallel.utils import is_coiled_function from pytask_parallel.utils import parse_future_result if TYPE_CHECKING: @@ -176,6 +177,30 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: kwargs = create_kwargs_for_task(task, remote=remote) + if is_coiled_function(task): + # Prevent circular import for coiled backend. + from pytask_parallel.wrappers import rewrap_task_with_coiled_function + + wrapper_func = rewrap_task_with_coiled_function(task) + + # Task modules are dynamically loaded and added to `sys.modules`. Thus, + # cloudpickle believes the module of the task function is also importable in the + # child process. We have to register the module as dynamic again, so that + # cloudpickle will pickle it with the function. See cloudpickle#417, pytask#373 + # and pytask#374. + task_module = get_module(task.function, getattr(task, "path", None)) + cloudpickle.register_pickle_by_value(task_module) + + return wrapper_func.submit( + task=task, + console_options=console.options, + kwargs=kwargs, + remote=True, + session_filterwarnings=session.config["filterwarnings"], + show_locals=session.config["show_locals"], + task_filterwarnings=get_marks(task, "filterwarnings"), + ) + if worker_type == WorkerType.PROCESSES: # Prevent circular import for loky backend. from pytask_parallel.wrappers import wrap_task_in_process diff --git a/src/pytask_parallel/utils.py b/src/pytask_parallel/utils.py index 0d54d7a..6cb044c 100644 --- a/src/pytask_parallel/utils.py +++ b/src/pytask_parallel/utils.py @@ -28,12 +28,19 @@ from pytask_parallel.wrappers import WrapperResult +try: + from coiled.function import Function as CoiledFunction +except ImportError: + + class CoiledFunction: ... # type: ignore[no-redef] + __all__ = [ "create_kwargs_for_task", "get_module", "parse_future_result", "is_local_path", + "is_coiled_function", ] @@ -165,3 +172,8 @@ def get_module(func: Callable[..., Any], path: Path | None) -> ModuleType: def is_local_path(path: Path) -> bool: """Check if a path is local.""" return isinstance(path, (FilePath, PosixPath, WindowsPath)) + + +def is_coiled_function(task: PTask) -> bool: + """Check if a function is a coiled function.""" + return "coiled_kwargs" in task.attributes diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index 185f819..b7f8a33 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -2,6 +2,7 @@ from __future__ import annotations +import functools import sys import warnings from contextlib import redirect_stderr @@ -24,6 +25,7 @@ from pytask.tree_util import tree_map_with_path from pytask.tree_util import tree_structure +from pytask_parallel.utils import CoiledFunction from pytask_parallel.utils import is_local_path if TYPE_CHECKING: @@ -80,7 +82,7 @@ def wrap_task_in_process( # noqa: PLR0913 show_locals: bool, task_filterwarnings: tuple[Mark, ...], ) -> WrapperResult: - """Unserialize and execute task. + """Execute a task in a spawned process. This function receives bytes and unpickles them to a task which is them execute in a spawned process or thread. @@ -149,6 +151,12 @@ def wrap_task_in_process( # noqa: PLR0913 ) +def rewrap_task_with_coiled_function(task: PTask) -> CoiledFunction: + return functools.wraps(wrap_task_in_process)( + CoiledFunction(wrap_task_in_process, **task.attributes["coiled_kwargs"]) + ) + + def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ARG001 msg = ( "You cannot use 'breakpoint()' or 'pdb.set_trace()' while parallelizing the " From d458d64be2cab38259acba976a40cef1744151ce Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 13 Apr 2024 01:50:23 +0200 Subject: [PATCH 2/5] remove package. --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 121ceb3..6d5d14f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,6 @@ dependencies = [ "pluggy>=1.0.0", "pytask>=0.4.5", "rich", - "project @ file:///home/tobia/git/pytask-workspace/pytask-parallel/example", ] dynamic = ["version"] From 4bdc9d9f469afe267c0f3468c75e1851717b7d0b Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 13 Apr 2024 08:59:05 +0200 Subject: [PATCH 3/5] fix. --- .pre-commit-config.yaml | 2 +- MANIFEST.in | 1 + pyproject.toml | 62 +++++++++++++++++------------------------ 3 files changed, 27 insertions(+), 38 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index acca674..986cb85 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -82,7 +82,7 @@ repos: hooks: - id: check-manifest args: [--no-build-isolation] - additional_dependencies: [hatchling, hatch-vcs] + additional_dependencies: [setuptools-scm, toml, wheel] - repo: meta hooks: - id: check-hooks-apply diff --git a/MANIFEST.in b/MANIFEST.in index b3ea241..55834ed 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ prune docs +prune docs_src prune tests exclude *.md diff --git a/pyproject.toml b/pyproject.toml index 6d5d14f..d28f4ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,7 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools>=64", "setuptools_scm[toml]>=8"] + [project] name = "pytask_parallel" description = "Parallelize the execution of tasks with pytask." @@ -16,7 +20,7 @@ dependencies = [ "loky", "pluggy>=1.0.0", "pytask>=0.4.5", - "rich", + "rich" ] dynamic = ["version"] @@ -25,20 +29,19 @@ name = "Tobias Raabe" email = "raabe@posteo.de" [project.optional-dependencies] -coiled = ["coiled>=0.9.4"] dask = ["dask[complete]", "distributed"] docs = [ - "furo", - "ipython", - "matplotlib", - "myst-parser", - "nbsphinx", - "sphinx", - "sphinx-click", - "sphinx-copybutton", - "sphinx-design>=0.3", - "sphinx-toolbox", - "sphinxext-opengraph", + "furo", + "ipython", + "matplotlib", + "myst-parser", + "nbsphinx", + "sphinx", + "sphinx-click", + "sphinx-copybutton", + "sphinx-design>=0.3", + "sphinx-toolbox", + "sphinxext-opengraph", ] test = [ "pytask-parallel[all]", @@ -46,9 +49,6 @@ test = [ "pytest", "pytest-cov", ] -aws = [ - "s3fs>=2024.3.1" -] [project.readme] file = "README.md" @@ -76,28 +76,15 @@ ignore = ["src/pytask_parallel/_version.py"] [project.entry-points.pytask] pytask_parallel = "pytask_parallel.plugin" -[build-system] -requires = ["hatchling", "hatch_vcs"] -build-backend = "hatchling.build" - -[tool.rye] -managed = true - -[tool.rye.scripts] -clean-docs = { cmd = "rm -rf docs/build" } -build-docs = { cmd = "sphinx-build -b html docs/source docs/build" } - -[tool.hatch.metadata] -allow-direct-references = true - -[tool.hatch.build.hooks.vcs] -version-file = "src/pytask_parallel/_version.py" +[tool.setuptools.package-dir] +"" = "src" -[tool.hatch.build.targets.wheel] -packages = ["src/pytask_parallel"] +[tool.setuptools.packages.find] +where = ["src"] +namespaces = false -[tool.hatch.version] -source = "vcs" +[tool.setuptools_scm] +version_file = "src/pytask_parallel/_version.py" [tool.mypy] files = ["src", "tests"] @@ -121,7 +108,9 @@ unsafe-fixes = true [tool.ruff.lint] extend-ignore = [ + # Others. "ANN101", # type annotating self + "ANN102", # type annotating cls "ANN401", # flake8-annotate typing.Any "COM812", # Comply with ruff-format. "ISC001", # Comply with ruff-format. @@ -131,7 +120,6 @@ select = ["ALL"] [tool.ruff.lint.per-file-ignores] "tests/*" = ["D", "ANN", "PLR2004", "S101"] "docs/source/conf.py" = ["INP001"] -"docs_src/*" = ["ARG001", "D", "INP001", "S301"] [tool.ruff.lint.isort] force-single-line = true From f0e69359852ce188b3c77ae81a77af2297e2c506 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 13 Apr 2024 09:44:17 +0200 Subject: [PATCH 4/5] Correct docs. --- docs/source/coiled.md | 53 ++++++++++++++---------- docs/source/conf.py | 1 + docs/source/custom_executors.md | 40 ++++-------------- docs/source/dask.md | 9 +++- docs/source/quickstart.md | 36 ++++++++++------ docs_src/coiled/coiled_functions_task.py | 7 +++- docs_src/custom_executors.py | 15 +++++++ src/pytask_parallel/__init__.py | 3 +- 8 files changed, 94 insertions(+), 70 deletions(-) create mode 100644 docs_src/custom_executors.py diff --git a/docs/source/coiled.md b/docs/source/coiled.md index 40404ed..d40b475 100644 --- a/docs/source/coiled.md +++ b/docs/source/coiled.md @@ -1,5 +1,11 @@ # coiled +```{caution} +Currently, the coiled backend can only be used if your workflow code is organized in a +package due to how pytask imports your code and dask serializes task functions +([issue](https://github.com/dask/distributed/issues/8607)). +``` + [coiled](https://www.coiled.io/) is a product built on top of dask that eases the deployment of your workflow to many cloud providers like AWS, GCP, and Azure. @@ -12,7 +18,10 @@ familiar with cloud providers or remote computing. - coiled manages your resources by spawning workers if you need them and shutting them down if they are idle. -- Synchronization of your local environment to remote workers. +- [Synchronization](https://docs.coiled.io/user_guide/software/sync.html) of your local + environment to remote workers. +- [Adaptive scaling](https://docs.dask.org/en/latest/adaptive.html) if your workflow + takes a long time to finish. There are two ways how you can use coiled with pytask and pytask-parallel. @@ -50,8 +59,9 @@ pytask --parallel-backend loky ``` When you apply the {func}`@task ` decorator to the task, make sure the -`@coiled.function` decorator is applied first, or is closer to the function. Otherwise, -it will be ignored. +{func}`@coiled.function ` decorator is applied first, or is closer to +the function. Otherwise, it will be ignored. Add more arguments to the decorator to +configure the hardware and software environment. ```{literalinclude} ../../docs_src/coiled/coiled_functions_task.py ``` @@ -65,30 +75,29 @@ Serverless functions are more thoroughly explained in ## Running a cluster -So, how can you run your pytask workflow on a cloud infrastructure with coiled? - -1. Follow their [guide on getting - started](https://docs.coiled.io/user_guide/setup/index.html) by creating a coiled - account and syncing it with your cloud provider. +It is also possible to launch a cluster and run each task in a worker provided by +coiled. Usually, it is not necessary and you are better off using coiled's serverless +functions. -1. Register a function that builds an executor using {class}`coiled.Cluster`. +If you want to launch a cluster managed by coiled, register a function that builds an +executor using {class}`coiled.Cluster`. - ```python - import coiled - from pytask_parallel import ParallelBackend - from pytask_parallel import registry - from concurrent.futures import Executor +```python +import coiled +from pytask_parallel import ParallelBackend +from pytask_parallel import registry +from concurrent.futures import Executor - def _build_coiled_executor(n_workers: int) -> Executor: - return coiled.Cluster(n_workers=n_workers).get_client().get_executor() +def _build_coiled_executor(n_workers: int) -> Executor: + return coiled.Cluster(n_workers=n_workers).get_client().get_executor() - registry.register_parallel_backend(ParallelBackend.CUSTOM, _build_coiled_executor) - ``` +registry.register_parallel_backend(ParallelBackend.CUSTOM, _build_coiled_executor) +``` -1. Execute your workflow with +Then, execute your workflow with - ```console - pytask --parallel-backend custom - ``` +```console +pytask --parallel-backend custom +``` diff --git a/docs/source/conf.py b/docs/source/conf.py index f537001..c8d69ce 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -87,6 +87,7 @@ "coiled": ("https://docs.coiled.io/", None), "dask": ("https://docs.dask.org/en/stable/", None), "distributed": ("https://distributed.dask.org/en/stable/", None), + "pytask": ("https://pytask-dev.readthedocs.io/en/stable/", None), "python": ("https://docs.python.org/3.10", None), } diff --git a/docs/source/custom_executors.md b/docs/source/custom_executors.md index f44b377..715da68 100644 --- a/docs/source/custom_executors.md +++ b/docs/source/custom_executors.md @@ -15,43 +15,19 @@ In some cases, adding a new backend can be as easy as registering a builder func that receives some arguments (currently only `n_workers`) and returns the instantiated executor. -```python -from concurrent.futures import Executor -from my_project.executor import CustomExecutor - -from pytask_parallel import ParallelBackend, registry - - -def build_custom_executor(n_workers: int) -> Executor: - return CustomExecutor(max_workers=n_workers) - - -registry.register_parallel_backend(ParallelBackend.CUSTOM, build_custom_executor) +```{literalinclude} ../../docs_src/custom_executors.py ``` +Given {class}`pytask_parallel.WorkerType` pytask applies automatic wrappers around the +task function to collect tracebacks, capture stdout/stderr and their like. The `remote` +keyword allows pytask to handle local paths automatically for remote clusters. + Now, build the project requesting your custom backend. ```console pytask --parallel-backend custom ``` -Realistically, it is not the only necessary adjustment for a nice user experience. There -are two other important things. pytask-parallel does not implement them by default since -it seems more tightly coupled to your backend. - -1. A wrapper for the executed function that captures warnings, catches exceptions and - saves products of the task (within the child process!). - - As an example, see - [`def _execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L91-L155) - that does all that for the processes and loky backend. - -1. To apply the wrapper, you need to write a custom hook implementation for - `def pytask_execute_task()`. See - [`def pytask_execute_task()`](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/src/pytask_parallel/processes.py#L41-L65) - for an example. Use the - [`hook_module`](https://pytask-dev.readthedocs.io/en/stable/how_to_guides/extending_pytask.html#using-hook-module-and-hook-module) - configuration value to register your implementation. - -Another example of an implementation can be found as a -[test](https://github.com/pytask-dev/pytask-parallel/blob/c441dbb75fa6ab3ab17d8ad5061840c802dc1c41/tests/test_backends.py#L35-L78). +```{important} +pytask applies automatic wrappers +``` diff --git a/docs/source/dask.md b/docs/source/dask.md index 26f0a98..99eabea 100644 --- a/docs/source/dask.md +++ b/docs/source/dask.md @@ -81,10 +81,17 @@ You can find more information in the documentation for [`dask.distributed`](https://distributed.dask.org/en/stable/). ``` -## Remote - Using cloud providers with coiled +## Remote + +You can learn how to deploy your tasks to a remote dask cluster in [this +guide](https://docs.dask.org/en/stable/deploying.html). They recommend to use coiled for +deployment to cloud providers. [coiled](https://www.coiled.io/) is a product built on top of dask that eases the deployment of your workflow to many cloud providers like AWS, GCP, and Azure. If you want to run the tasks in your project on a cluster managed by coiled read {ref}`this guide `. + +Otherwise, follow the instructions in [dask's +guide](https://docs.dask.org/en/stable/deploying.html). diff --git a/docs/source/quickstart.md b/docs/source/quickstart.md index b780fcc..dd583cb 100644 --- a/docs/source/quickstart.md +++ b/docs/source/quickstart.md @@ -18,7 +18,9 @@ $ conda install -c conda-forge pytask-parallel When the plugin is only installed and pytask executed, the tasks are not run in parallel. -For parallelization with the default backend [loky](https://loky.readthedocs.io/), you need to launch multiple workers. +For parallelization with the default backend [loky](https://loky.readthedocs.io/), you +need to launch multiple workers or specify the parallel backend explicitly. Here, is how +you launch multiple workers. `````{tab-set} ````{tab-item} CLI @@ -45,8 +47,8 @@ n_workers = "auto" ```` ````` -To use a different backend, pass the `--parallel-backend` option. The following command -will execute the workflow with one worker and the loky backend. +To specify the parallel backend, pass the `--parallel-backend` option. The following +command will execute the workflow with one worker and the loky backend. `````{tab-set} ````{tab-item} CLI @@ -72,23 +74,32 @@ parallel_backend = "loky" It is not possible to combine parallelization with debugging. That is why `--pdb` or `--trace` deactivate parallelization. -If you parallelize the execution of your tasks using two or more workers, do not use -`breakpoint()` or `import pdb; pdb.set_trace()` since both will cause exceptions. +If you parallelize the execution of your tasks, do not use `breakpoint()` or +`import pdb; pdb.set_trace()` since both will cause exceptions. ``` ### loky There are multiple backends available. The default is the backend provided by loky which -aims to be a more robust implementation of {class}`~multiprocessing.pool.Pool` and in +is a more robust implementation of {class}`~multiprocessing.pool.Pool` and in {class}`~concurrent.futures.ProcessPoolExecutor`. ```console pytask --parallel-backend loky ``` -As it spawns workers in new processes to run the tasks, it is especially suited for -CPU-bound tasks. ([Here](https://stackoverflow.com/a/868577/7523785) is an -explanation of what CPU- or IO-bound means.) +A parallel backend with processes is especially suited for CPU-bound tasks as it spawns +workers in new processes to run the tasks. +([Here](https://stackoverflow.com/a/868577/7523785) is an explanation of what CPU- or +IO-bound means.) + +### coiled + +pytask-parallel integrates with coiled allowing to run tasks in virtual machines of AWS, +Azure and GCP. You can decide whether to run only some selected tasks or the whole +project in the cloud. + +Read more about coiled in [this guide](coiled.md). ### `concurrent.futures` @@ -141,11 +152,10 @@ Capturing warnings is not thread-safe. Therefore, warnings cannot be captured re when tasks are parallelized with `--parallel-backend threads`. ``` -### dask + coiled +### dask -dask and coiled together provide the option to execute your workflow on cloud providers -like AWS, GCP or Azure. Check out the [dedicated guide](dask.md) if you are interested -in that. +dask allows to run your workflows on many different kinds of clusters like cloud +clusters and traditional HPC. Using the default mode, dask will spawn multiple local workers to process the tasks. diff --git a/docs_src/coiled/coiled_functions_task.py b/docs_src/coiled/coiled_functions_task.py index cc4096a..51d2665 100644 --- a/docs_src/coiled/coiled_functions_task.py +++ b/docs_src/coiled/coiled_functions_task.py @@ -3,5 +3,10 @@ @task -@coiled.function() +@coiled.function( + region="eu-central-1", # Run the task close to you. + memory="512 GB", # Use a lot of memory. + cpu=128, # Use a lot of CPU. + vm_type="p3.2xlarge", # Run a GPU instance. +) def task_example() -> None: ... diff --git a/docs_src/custom_executors.py b/docs_src/custom_executors.py new file mode 100644 index 0000000..3e13527 --- /dev/null +++ b/docs_src/custom_executors.py @@ -0,0 +1,15 @@ +from concurrent.futures import Executor + +from my_project.executor import CustomExecutor +from pytask_parallel import ParallelBackend +from pytask_parallel import WorkerType +from pytask_parallel import registry + + +def build_custom_executor(n_workers: int) -> Executor: + return CustomExecutor( + max_workers=n_workers, worker_type=WorkerType.PROCESSES, remote=False + ) + + +registry.register_parallel_backend(ParallelBackend.CUSTOM, build_custom_executor) diff --git a/src/pytask_parallel/__init__.py b/src/pytask_parallel/__init__.py index 812a622..0e32f3b 100644 --- a/src/pytask_parallel/__init__.py +++ b/src/pytask_parallel/__init__.py @@ -3,6 +3,7 @@ from __future__ import annotations from pytask_parallel.backends import ParallelBackend +from pytask_parallel.backends import WorkerType from pytask_parallel.backends import registry try: @@ -13,4 +14,4 @@ __version__ = "unknown" -__all__ = ["ParallelBackend", "__version__", "registry"] +__all__ = ["ParallelBackend", "__version__", "registry", "WorkerType"] From 3cfa86752843f99e115ede24092112c3c1eb1330 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 13 Apr 2024 11:39:09 +0200 Subject: [PATCH 5/5] Remove --- MANIFEST.in | 13 ------------- 1 file changed, 13 deletions(-) delete mode 100644 MANIFEST.in diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index 55834ed..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,13 +0,0 @@ -prune docs -prune docs_src -prune tests - -exclude *.md -exclude *.yaml -exclude *.yml -exclude tox.ini - -include README.md -include LICENSE - -recursive-include src py.typed