Skip to content

Commit 4e95d0c

Browse files
committed
better position.
1 parent 742b35b commit 4e95d0c

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

src/pytask_parallel/backends.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from __future__ import annotations
33

44
import enum
5-
import inspect
65
from concurrent.futures import Future
76
from concurrent.futures import ProcessPoolExecutor
87
from concurrent.futures import ThreadPoolExecutor
@@ -27,8 +26,6 @@ def submit( # type: ignore[override]
2726
self, fn: Callable[..., Any], *args: Any, **kwargs: Any # noqa: ARG002
2827
) -> Future[Any]:
2928
"""Submit a new task."""
30-
task_module = inspect.getmodule(kwargs["task"].function)
31-
cloudpickle.register_pickle_by_value(task_module)
3229
return super().submit(
3330
deserialize_and_run_with_cloudpickle,
3431
fn=cloudpickle.dumps(fn),
@@ -68,5 +65,5 @@ class ParallelBackend(enum.Enum): # type: ignore[no-redef]
6865
PARALLEL_BACKENDS = {
6966
ParallelBackend.PROCESSES: CloudpickleProcessPoolExecutor,
7067
ParallelBackend.THREADS: ThreadPoolExecutor,
71-
ParallelBackend.LOKY: (get_reusable_executor), # type: ignore[attr-defined]
68+
ParallelBackend.LOKY: get_reusable_executor, # type: ignore[attr-defined]
7269
}

src/pytask_parallel/execute.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import List
1313

1414
import attr
15+
import cloudpickle
1516
from pytask import console
1617
from pytask import ExecutionReport
1718
from pytask import get_marks
@@ -185,6 +186,14 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[Any] | None:
185186
if session.config["n_workers"] > 1:
186187
kwargs = _create_kwargs_for_task(task)
187188

189+
# Task modules are dynamically loaded and added to `sys.modules`. Thus,
190+
# cloudpickle believes the module of the task function is also importable in
191+
# the child process. We have to register the module as dynamic again, so
192+
# that cloudpickle will pickle it with the function. See cloudpickle#417,
193+
# pytask#373 and pytask#374.
194+
task_module = inspect.getmodule(task.function)
195+
cloudpickle.register_pickle_by_value(task_module)
196+
188197
return session.config["_parallel_executor"].submit(
189198
_execute_task,
190199
task=task,

0 commit comments

Comments
 (0)