From 2108f6142d72d21cf5367ca144a66e7e1f277fbc Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Thu, 20 Apr 2023 01:07:09 +0200 Subject: [PATCH 1/3] Fix. --- src/pytask_parallel/backends.py | 32 ++++++++++++++++++++++++++++++-- src/pytask_parallel/execute.py | 15 ++++----------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/src/pytask_parallel/backends.py b/src/pytask_parallel/backends.py index 6d81c3e..ec717e9 100644 --- a/src/pytask_parallel/backends.py +++ b/src/pytask_parallel/backends.py @@ -2,8 +2,36 @@ from __future__ import annotations import enum +from concurrent.futures import Future from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor +from typing import Any +from typing import Callable + +import cloudpickle + + +def deserialize_and_run_with_cloudpickle( + fn: Callable[..., Any], /, kwargs: dict[str, Any] +) -> Any: + """Deserialize and execute a function and keyword arguments.""" + deserialized_fn = cloudpickle.loads(fn) + deserialized_kwargs = cloudpickle.loads(kwargs) + return deserialized_fn(**deserialized_kwargs) + + +class CloudpickleProcessPoolExecutor(ProcessPoolExecutor): + """Patches the standard executor to serialize functions with cloudpickle.""" + + def submit( + self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any # noqa: ARG002 + ) -> Future[Any]: + """Submit a new task.""" + return super().submit( + deserialize_and_run_with_cloudpickle, + cloudpickle.dumps(fn), + kwargs=cloudpickle.dumps(kwargs), + ) try: @@ -20,7 +48,7 @@ class ParallelBackendChoices(enum.Enum): PARALLEL_BACKENDS_DEFAULT = ParallelBackendChoices.PROCESSES PARALLEL_BACKENDS = { - ParallelBackendChoices.PROCESSES: ProcessPoolExecutor, + ParallelBackendChoices.PROCESSES: CloudpickleProcessPoolExecutor, ParallelBackendChoices.THREADS: ThreadPoolExecutor, } @@ -36,7 +64,7 @@ class ParallelBackendChoices(enum.Enum): # type: ignore[no-redef] PARALLEL_BACKENDS_DEFAULT = ParallelBackendChoices.PROCESSES PARALLEL_BACKENDS = { - ParallelBackendChoices.PROCESSES: ProcessPoolExecutor, + ParallelBackendChoices.PROCESSES: CloudpickleProcessPoolExecutor, ParallelBackendChoices.THREADS: ThreadPoolExecutor, ParallelBackendChoices.LOKY: ( # type: ignore[attr-defined] get_reusable_executor diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index 72e7e0f..8f72785 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -12,7 +12,6 @@ from typing import List import attr -import cloudpickle from pybaum.tree_util import tree_map from pytask import console from pytask import ExecutionReport @@ -179,13 +178,10 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None: if session.config["n_workers"] > 1: kwargs = _create_kwargs_for_task(task) - bytes_function = cloudpickle.dumps(task) - bytes_kwargs = cloudpickle.dumps(kwargs) - return session.config["_parallel_executor"].submit( _unserialize_and_execute_task, - bytes_function=bytes_function, - bytes_kwargs=bytes_kwargs, + task=task, + kwargs=kwargs, show_locals=session.config["show_locals"], console_options=console.options, session_filterwarnings=session.config["filterwarnings"], @@ -196,8 +192,8 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None: def _unserialize_and_execute_task( # noqa: PLR0913 - bytes_function: bytes, - bytes_kwargs: bytes, + task: Task, + kwargs: dict[str, Any], show_locals: bool, console_options: ConsoleOptions, session_filterwarnings: tuple[str, ...], @@ -212,9 +208,6 @@ def _unserialize_and_execute_task( # noqa: PLR0913 """ __tracebackhide__ = True - task = cloudpickle.loads(bytes_function) - kwargs = cloudpickle.loads(bytes_kwargs) - with warnings.catch_warnings(record=True) as log: # mypy can't infer that record=True means log is not None; help it. assert log is not None From 4f539ae56160516f1001501dc6ecea51b053aa12 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Thu, 20 Apr 2023 01:08:50 +0200 Subject: [PATCH 2/3] to changes. --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 86fd0bc..9ecfcd0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and ## 0.3.1 - 2023-xx-xx +- {pull}`56` refactors the `ProcessPoolExecutor`. + ## 0.3.0 - 2023-01-23 - {pull}`50` deprecates INI configurations and aligns the package with pytask v0.3. From 4cefdfc3108c88656f8e980595abc6f3d49f0e46 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sat, 22 Apr 2023 02:16:09 +0200 Subject: [PATCH 3/3] remove position only args. --- src/pytask_parallel/backends.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/pytask_parallel/backends.py b/src/pytask_parallel/backends.py index ec717e9..3a4d9de 100644 --- a/src/pytask_parallel/backends.py +++ b/src/pytask_parallel/backends.py @@ -12,7 +12,7 @@ def deserialize_and_run_with_cloudpickle( - fn: Callable[..., Any], /, kwargs: dict[str, Any] + fn: Callable[..., Any], kwargs: dict[str, Any] ) -> Any: """Deserialize and execute a function and keyword arguments.""" deserialized_fn = cloudpickle.loads(fn) @@ -23,13 +23,14 @@ def deserialize_and_run_with_cloudpickle( class CloudpickleProcessPoolExecutor(ProcessPoolExecutor): """Patches the standard executor to serialize functions with cloudpickle.""" - def submit( - self, fn: Callable[..., Any], /, *args: Any, **kwargs: Any # noqa: ARG002 + # The type signature is wrong for version above Py3.7. Fix when 3.7 is deprecated. + def submit( # type: ignore[override] + self, fn: Callable[..., Any], *args: Any, **kwargs: Any # noqa: ARG002 ) -> Future[Any]: """Submit a new task.""" return super().submit( deserialize_and_run_with_cloudpickle, - cloudpickle.dumps(fn), + fn=cloudpickle.dumps(fn), kwargs=cloudpickle.dumps(kwargs), )