Skip to content

Commit 742b35b

Browse files
committed
Fix import issues.
1 parent 6234784 commit 742b35b

File tree

6 files changed

+34
-56
lines changed

6 files changed

+34
-56
lines changed

src/pytask_parallel/backends.py

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,18 @@
22
from __future__ import annotations
33

44
import enum
5+
import inspect
56
from concurrent.futures import Future
67
from concurrent.futures import ProcessPoolExecutor
78
from concurrent.futures import ThreadPoolExecutor
89
from typing import Any
910
from typing import Callable
1011

1112
import cloudpickle
12-
from _pytask.path import import_path
1313

1414

15-
def deserialize_and_run_with_cloudpickle(
16-
fn: bytes, kwargs: bytes, kwargs_import_path: bytes
17-
) -> Any:
15+
def deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any:
1816
"""Deserialize and execute a function and keyword arguments."""
19-
deserialized_kwargs_import_path = cloudpickle.loads(kwargs_import_path)
20-
if deserialized_kwargs_import_path:
21-
import_path(**deserialized_kwargs_import_path)
22-
2317
deserialized_fn = cloudpickle.loads(fn)
2418
deserialized_kwargs = cloudpickle.loads(kwargs)
2519
return deserialized_fn(**deserialized_kwargs)
@@ -33,10 +27,11 @@ def submit( # type: ignore[override]
3327
self, fn: Callable[..., Any], *args: Any, **kwargs: Any # noqa: ARG002
3428
) -> Future[Any]:
3529
"""Submit a new task."""
30+
task_module = inspect.getmodule(kwargs["task"].function)
31+
cloudpickle.register_pickle_by_value(task_module)
3632
return super().submit(
3733
deserialize_and_run_with_cloudpickle,
3834
fn=cloudpickle.dumps(fn),
39-
kwargs_import_path=cloudpickle.dumps(kwargs.pop("kwargs_import_path")),
4035
kwargs=cloudpickle.dumps(kwargs),
4136
)
4237

@@ -46,34 +41,32 @@ def submit( # type: ignore[override]
4641

4742
except ImportError:
4843

49-
class ParallelBackendChoices(enum.Enum):
44+
class ParallelBackend(enum.Enum):
5045
"""Choices for parallel backends."""
5146

5247
PROCESSES = "processes"
5348
THREADS = "threads"
5449

50+
PARALLEL_BACKENDS_DEFAULT = ParallelBackend.PROCESSES
51+
5552
PARALLEL_BACKENDS = {
56-
ParallelBackendChoices.PROCESSES: CloudpickleProcessPoolExecutor,
57-
ParallelBackendChoices.THREADS: ThreadPoolExecutor,
53+
ParallelBackend.PROCESSES: CloudpickleProcessPoolExecutor,
54+
ParallelBackend.THREADS: ThreadPoolExecutor,
5855
}
5956

6057
else:
6158

62-
class ParallelBackendChoices(enum.Enum): # type: ignore[no-redef]
59+
class ParallelBackend(enum.Enum): # type: ignore[no-redef]
6360
"""Choices for parallel backends."""
6461

6562
PROCESSES = "processes"
6663
THREADS = "threads"
6764
LOKY = "loky"
6865

69-
PARALLEL_BACKENDS_DEFAULT = ParallelBackendChoices.PROCESSES
66+
PARALLEL_BACKENDS_DEFAULT = ParallelBackend.LOKY # type: ignore[attr-defined]
7067

7168
PARALLEL_BACKENDS = {
72-
ParallelBackendChoices.PROCESSES: CloudpickleProcessPoolExecutor,
73-
ParallelBackendChoices.THREADS: ThreadPoolExecutor,
74-
ParallelBackendChoices.LOKY: ( # type: ignore[attr-defined]
75-
get_reusable_executor
76-
),
69+
ParallelBackend.PROCESSES: CloudpickleProcessPoolExecutor,
70+
ParallelBackend.THREADS: ThreadPoolExecutor,
71+
ParallelBackend.LOKY: (get_reusable_executor), # type: ignore[attr-defined]
7772
}
78-
79-
PARALLEL_BACKENDS_DEFAULT = ParallelBackendChoices.PROCESSES

src/pytask_parallel/build.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from pytask import EnumChoice
66
from pytask import hookimpl
77
from pytask_parallel.backends import PARALLEL_BACKENDS_DEFAULT
8-
from pytask_parallel.backends import ParallelBackendChoices
8+
from pytask_parallel.backends import ParallelBackend
99

1010

1111
@hookimpl
@@ -23,7 +23,7 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
2323
),
2424
click.Option(
2525
["--parallel-backend"],
26-
type=EnumChoice(ParallelBackendChoices),
26+
type=EnumChoice(ParallelBackend),
2727
help="Backend for the parallelization.",
2828
default=PARALLEL_BACKENDS_DEFAULT,
2929
),

src/pytask_parallel/config.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Any
77

88
from pytask import hookimpl
9-
from pytask_parallel.backends import ParallelBackendChoices
9+
from pytask_parallel.backends import ParallelBackend
1010

1111

1212
@hookimpl
@@ -17,12 +17,12 @@ def pytask_parse_config(config: dict[str, Any]) -> None:
1717

1818
if (
1919
isinstance(config["parallel_backend"], str)
20-
and config["parallel_backend"] in ParallelBackendChoices._value2member_map_
20+
and config["parallel_backend"] in ParallelBackend._value2member_map_
2121
):
22-
config["parallel_backend"] = ParallelBackendChoices(config["parallel_backend"])
22+
config["parallel_backend"] = ParallelBackend(config["parallel_backend"])
2323
elif (
2424
isinstance(config["parallel_backend"], enum.Enum)
25-
and config["parallel_backend"] in ParallelBackendChoices
25+
and config["parallel_backend"] in ParallelBackend
2626
):
2727
pass
2828
else:

src/pytask_parallel/execute.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from pytask import Mark
2020
from pytask import parse_warning_filter
2121
from pytask import PTask
22-
from pytask import PTaskWithPath
2322
from pytask import remove_internal_traceback_frames_from_exc_info
2423
from pytask import Session
2524
from pytask import Task
@@ -30,15 +29,15 @@
3029
from pytask.tree_util import tree_map
3130
from pytask.tree_util import tree_structure
3231
from pytask_parallel.backends import PARALLEL_BACKENDS
33-
from pytask_parallel.backends import ParallelBackendChoices
32+
from pytask_parallel.backends import ParallelBackend
3433
from rich.console import ConsoleOptions
3534
from rich.traceback import Traceback
3635

3736

3837
@hookimpl
3938
def pytask_post_parse(config: dict[str, Any]) -> None:
4039
"""Register the parallel backend."""
41-
if config["parallel_backend"] == ParallelBackendChoices.THREADS:
40+
if config["parallel_backend"] == ParallelBackend.THREADS:
4241
config["pm"].register(DefaultBackendNameSpace)
4342
else:
4443
config["pm"].register(ProcessesNameSpace)
@@ -186,11 +185,6 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[Any] | None:
186185
if session.config["n_workers"] > 1:
187186
kwargs = _create_kwargs_for_task(task)
188187

189-
if sys.platform == "win32" and isinstance(task, PTaskWithPath):
190-
kwargs_import_path = {"path": task.path, "root": session.config["root"]}
191-
else:
192-
kwargs_import_path = None
193-
194188
return session.config["_parallel_executor"].submit(
195189
_execute_task,
196190
task=task,
@@ -199,7 +193,6 @@ def pytask_execute_task(session: Session, task: PTask) -> Future[Any] | None:
199193
console_options=console.options,
200194
session_filterwarnings=session.config["filterwarnings"],
201195
task_filterwarnings=get_marks(task, "filterwarnings"),
202-
kwargs_import_path=kwargs_import_path,
203196
)
204197
return None
205198

tests/test_config.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import pytest
77
from pytask import build
88
from pytask import ExitCode
9-
from pytask_parallel.backends import ParallelBackendChoices
9+
from pytask_parallel.backends import ParallelBackend
1010

1111

1212
@pytest.mark.end_to_end()
@@ -36,13 +36,13 @@ def test_interplay_between_debugging_and_parallel(tmp_path, pdb, n_workers, expe
3636
]
3737
+ [
3838
("parallel_backend", parallel_backend, ExitCode.OK)
39-
for parallel_backend in ParallelBackendChoices
39+
for parallel_backend in ParallelBackend
4040
],
4141
)
4242
def test_reading_values_from_config_file(
4343
tmp_path, configuration_option, value, exit_code
4444
):
45-
config_value = value.value if isinstance(value, ParallelBackendChoices) else value
45+
config_value = value.value if isinstance(value, ParallelBackend) else value
4646
config = f"""
4747
[tool.pytask.ini_options]
4848
{configuration_option} = {config_value!r}

tests/test_execute.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,26 +8,18 @@
88
from pytask import cli
99
from pytask import ExitCode
1010
from pytask_parallel.backends import PARALLEL_BACKENDS
11-
from pytask_parallel.backends import ParallelBackendChoices
11+
from pytask_parallel.backends import ParallelBackend
1212
from pytask_parallel.execute import _Sleeper
1313

1414
from tests.conftest import restore_sys_path_and_module_after_test_execution
1515

1616

17-
_PARALLEL_BACKENDS_PARAMETRIZATION = [
18-
pytest.param(i, marks=pytest.mark.xfail(reason="loky fails"))
19-
if i.value == "loky"
20-
else pytest.param(i)
21-
for i in PARALLEL_BACKENDS
22-
]
23-
24-
2517
class Session:
2618
pass
2719

2820

2921
@pytest.mark.end_to_end()
30-
@pytest.mark.parametrize("parallel_backend", _PARALLEL_BACKENDS_PARAMETRIZATION)
22+
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
3123
def test_parallel_execution_speedup(tmp_path, parallel_backend):
3224
source = """
3325
import pytask
@@ -61,7 +53,7 @@ def task_2(produces):
6153

6254

6355
@pytest.mark.end_to_end()
64-
@pytest.mark.parametrize("parallel_backend", _PARALLEL_BACKENDS_PARAMETRIZATION)
56+
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
6557
def test_parallel_execution_speedup_w_cli(runner, tmp_path, parallel_backend):
6658
source = """
6759
import pytask
@@ -108,7 +100,7 @@ def task_2(produces):
108100

109101

110102
@pytest.mark.end_to_end()
111-
@pytest.mark.parametrize("parallel_backend", _PARALLEL_BACKENDS_PARAMETRIZATION)
103+
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
112104
def test_stop_execution_when_max_failures_is_reached(tmp_path, parallel_backend):
113105
source = """
114106
import time
@@ -136,7 +128,7 @@ def task_3(): time.sleep(3)
136128

137129

138130
@pytest.mark.end_to_end()
139-
@pytest.mark.parametrize("parallel_backend", _PARALLEL_BACKENDS_PARAMETRIZATION)
131+
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
140132
def test_task_priorities(tmp_path, parallel_backend):
141133
source = """
142134
import pytask
@@ -177,7 +169,7 @@ def task_5():
177169

178170

179171
@pytest.mark.end_to_end()
180-
@pytest.mark.parametrize("parallel_backend", _PARALLEL_BACKENDS_PARAMETRIZATION)
172+
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
181173
@pytest.mark.parametrize("show_locals", [True, False])
182174
def test_rendering_of_tracebacks_with_rich(
183175
runner, tmp_path, parallel_backend, show_locals
@@ -206,7 +198,7 @@ def task_raising_error():
206198
@pytest.mark.parametrize(
207199
"parallel_backend",
208200
# Capturing warnings is not thread-safe.
209-
[ParallelBackendChoices.PROCESSES],
201+
[ParallelBackend.PROCESSES],
210202
)
211203
def test_collect_warnings_from_parallelized_tasks(runner, tmp_path, parallel_backend):
212204
source = """
@@ -259,7 +251,7 @@ def test_sleeper():
259251

260252

261253
@pytest.mark.end_to_end()
262-
@pytest.mark.parametrize("parallel_backend", _PARALLEL_BACKENDS_PARAMETRIZATION)
254+
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
263255
def test_task_that_return(runner, tmp_path, parallel_backend):
264256
source = """
265257
from pathlib import Path
@@ -277,7 +269,7 @@ def task_example() -> Annotated[str, Path("file.txt")]:
277269

278270

279271
@pytest.mark.end_to_end()
280-
@pytest.mark.parametrize("parallel_backend", _PARALLEL_BACKENDS_PARAMETRIZATION)
272+
@pytest.mark.parametrize("parallel_backend", PARALLEL_BACKENDS)
281273
def test_task_without_path_that_return(runner, tmp_path, parallel_backend):
282274
source = """
283275
from pathlib import Path

0 commit comments

Comments
 (0)