Skip to content

Align with v0.4.0rc2. #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ jobs:
shell: bash -l {0}
run: bash <(curl -s https://codecov.io/bash) -F unit -c

- name: Run integration tests.
shell: bash -l {0}
run: tox -e pytest -- tests -m integration --cov=./ --cov-report=xml -n auto

- name: Upload coverage reports of integration tests.
if: runner.os == 'Linux' && matrix.python-version == '3.9'
shell: bash -l {0}
run: bash <(curl -s https://codecov.io/bash) -F integration -c
# - name: Run integration tests.
# shell: bash -l {0}
# run: tox -e pytest -- tests -m integration --cov=./ --cov-report=xml -n auto

# - name: Upload coverage reports of integration tests.
# if: runner.os == 'Linux' && matrix.python-version == '3.9'
# shell: bash -l {0}
# run: bash <(curl -s https://codecov.io/bash) -F integration -c

- name: Run end-to-end tests.
shell: bash -l {0}
Expand Down
6 changes: 5 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@ repos:
--ignore-missing-imports,
]
additional_dependencies: [
cloudpickle,
optree,
pytask==0.4.0rc2,
rich,
types-attrs,
types-click,
types-setuptools
types-setuptools,
]
pass_filenames: false
- repo: https://github.com/mgedmin/check-manifest
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
## 0.4.0 - 2023-xx-xx

- {pull}`62` deprecates Python 3.7.
- {pull}`64` aligns pytask-parallel with pytask v0.4.0rc2.

## 0.3.1 - 2023-05-27

Expand Down
10 changes: 3 additions & 7 deletions environment.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: pytask-parallel

channels:
- conda-forge/label/pytask_rc
- conda-forge
- nodefaults

Expand All @@ -10,16 +11,11 @@ dependencies:
- setuptools_scm
- toml

# Conda
- anaconda-client
- conda-build
- conda-verify

# Package dependencies
- pytask >=0.3
- pytask>=0.4.0rc2
- cloudpickle
- loky
- pybaum >=0.1.1
- optree

# Misc
- black
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ convention = "numpy"

[tool.pytest.ini_options]
# Do not add src since it messes with the loading of pytask-parallel as a plugin.
testpaths = ["test"]
testpaths = ["tests"]
markers = [
"wip: Tests that are work-in-progress.",
"unit: Flag for unit tests which target mainly a single function.",
Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ install_requires =
click
cloudpickle
loky
pybaum>=0.1.1
pytask>=0.3
optree>=0.9.0
pytask>=0.4.0rc2
python_requires = >=3.8
include_package_data = True
package_dir = =src
Expand Down
26 changes: 11 additions & 15 deletions src/pytask_parallel/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import cloudpickle


def deserialize_and_run_with_cloudpickle(
fn: Callable[..., Any], kwargs: dict[str, Any]
) -> Any:
def deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any:
"""Deserialize and execute a function and keyword arguments."""
deserialized_fn = cloudpickle.loads(fn)
deserialized_kwargs = cloudpickle.loads(kwargs)
Expand All @@ -40,34 +38,32 @@ def submit( # type: ignore[override]

except ImportError:

class ParallelBackendChoices(enum.Enum):
class ParallelBackend(enum.Enum):
"""Choices for parallel backends."""

PROCESSES = "processes"
THREADS = "threads"

PARALLEL_BACKENDS_DEFAULT = ParallelBackend.PROCESSES

PARALLEL_BACKENDS = {
ParallelBackendChoices.PROCESSES: CloudpickleProcessPoolExecutor,
ParallelBackendChoices.THREADS: ThreadPoolExecutor,
ParallelBackend.PROCESSES: CloudpickleProcessPoolExecutor,
ParallelBackend.THREADS: ThreadPoolExecutor,
}

else:

class ParallelBackendChoices(enum.Enum): # type: ignore[no-redef]
class ParallelBackend(enum.Enum): # type: ignore[no-redef]
"""Choices for parallel backends."""

PROCESSES = "processes"
THREADS = "threads"
LOKY = "loky"

PARALLEL_BACKENDS_DEFAULT = ParallelBackendChoices.PROCESSES
PARALLEL_BACKENDS_DEFAULT = ParallelBackend.LOKY # type: ignore[attr-defined]

PARALLEL_BACKENDS = {
ParallelBackendChoices.PROCESSES: CloudpickleProcessPoolExecutor,
ParallelBackendChoices.THREADS: ThreadPoolExecutor,
ParallelBackendChoices.LOKY: ( # type: ignore[attr-defined]
get_reusable_executor
),
ParallelBackend.PROCESSES: CloudpickleProcessPoolExecutor,
ParallelBackend.THREADS: ThreadPoolExecutor,
ParallelBackend.LOKY: get_reusable_executor, # type: ignore[attr-defined]
}

PARALLEL_BACKENDS_DEFAULT = ParallelBackendChoices.PROCESSES
4 changes: 2 additions & 2 deletions src/pytask_parallel/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pytask import EnumChoice
from pytask import hookimpl
from pytask_parallel.backends import PARALLEL_BACKENDS_DEFAULT
from pytask_parallel.backends import ParallelBackendChoices
from pytask_parallel.backends import ParallelBackend


@hookimpl
Expand All @@ -23,7 +23,7 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
),
click.Option(
["--parallel-backend"],
type=EnumChoice(ParallelBackendChoices),
type=EnumChoice(ParallelBackend),
help="Backend for the parallelization.",
default=PARALLEL_BACKENDS_DEFAULT,
),
Expand Down
8 changes: 4 additions & 4 deletions src/pytask_parallel/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any

from pytask import hookimpl
from pytask_parallel.backends import ParallelBackendChoices
from pytask_parallel.backends import ParallelBackend


@hookimpl
Expand All @@ -17,12 +17,12 @@ def pytask_parse_config(config: dict[str, Any]) -> None:

if (
isinstance(config["parallel_backend"], str)
and config["parallel_backend"] in ParallelBackendChoices._value2member_map_
and config["parallel_backend"] in ParallelBackend._value2member_map_
):
config["parallel_backend"] = ParallelBackendChoices(config["parallel_backend"])
config["parallel_backend"] = ParallelBackend(config["parallel_backend"])
elif (
isinstance(config["parallel_backend"], enum.Enum)
and config["parallel_backend"] in ParallelBackendChoices
and config["parallel_backend"] in ParallelBackend
):
pass
else:
Expand Down
86 changes: 62 additions & 24 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,33 @@
from typing import List

import attr
from pybaum.tree_util import tree_map
import cloudpickle
from pytask import console
from pytask import ExecutionReport
from pytask import get_marks
from pytask import hookimpl
from pytask import Mark
from pytask import parse_warning_filter
from pytask import PTask
from pytask import remove_internal_traceback_frames_from_exc_info
from pytask import Session
from pytask import Task
from pytask import warning_record_to_str
from pytask import WarningReport
from pytask.tree_util import PyTree
from pytask.tree_util import tree_leaves
from pytask.tree_util import tree_map
from pytask.tree_util import tree_structure
from pytask_parallel.backends import PARALLEL_BACKENDS
from pytask_parallel.backends import ParallelBackendChoices
from pytask_parallel.backends import ParallelBackend
from rich.console import ConsoleOptions
from rich.traceback import Traceback


@hookimpl
def pytask_post_parse(config: dict[str, Any]) -> None:
"""Register the parallel backend."""
if config["parallel_backend"] == ParallelBackendChoices.THREADS:
if config["parallel_backend"] == ParallelBackend.THREADS:
config["pm"].register(DefaultBackendNameSpace)
else:
config["pm"].register(ProcessesNameSpace)
Expand Down Expand Up @@ -99,12 +104,19 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
for task_name in list(running_tasks):
future = running_tasks[task_name]
if future.done():
warning_reports, task_exception = future.result()
session.warnings.extend(warning_reports)
exc_info = (
_parse_future_exception(future.exception())
or task_exception
)
# An exception was thrown before the task was executed.
if future.exception() is not None:
exc_info = _parse_future_exception(future.exception())
warning_reports = []
# A task raised an exception.
else:
warning_reports, task_exception = future.result()
session.warnings.extend(warning_reports)
exc_info = (
_parse_future_exception(future.exception())
or task_exception
)

if exc_info is not None:
task = session.dag.nodes[task_name]["task"]
newly_collected_reports.append(
Expand Down Expand Up @@ -165,7 +177,7 @@ class ProcessesNameSpace:

@staticmethod
@hookimpl(tryfirst=True)
def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None:
def pytask_execute_task(session: Session, task: PTask) -> Future[Any] | None:
"""Execute a task.

Take a task, pickle it and send the bytes over to another process.
Expand All @@ -174,27 +186,33 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None:
if session.config["n_workers"] > 1:
kwargs = _create_kwargs_for_task(task)

# Task modules are dynamically loaded and added to `sys.modules`. Thus,
# cloudpickle believes the module of the task function is also importable in
# the child process. We have to register the module as dynamic again, so
# that cloudpickle will pickle it with the function. See cloudpickle#417,
# pytask#373 and pytask#374.
task_module = inspect.getmodule(task.function)
cloudpickle.register_pickle_by_value(task_module)

return session.config["_parallel_executor"].submit(
_unserialize_and_execute_task,
_execute_task,
task=task,
kwargs=kwargs,
show_locals=session.config["show_locals"],
console_options=console.options,
session_filterwarnings=session.config["filterwarnings"],
task_filterwarnings=get_marks(task, "filterwarnings"),
task_short_name=task.short_name,
)
return None


def _unserialize_and_execute_task( # noqa: PLR0913
task: Task,
def _execute_task( # noqa: PLR0913
task: PTask,
kwargs: dict[str, Any],
show_locals: bool,
console_options: ConsoleOptions,
session_filterwarnings: tuple[str, ...],
task_filterwarnings: tuple[Mark, ...],
task_short_name: str,
) -> tuple[list[WarningReport], tuple[type[BaseException], BaseException, str] | None]:
"""Unserialize and execute task.

Expand All @@ -217,23 +235,41 @@ def _unserialize_and_execute_task( # noqa: PLR0913
warnings.filterwarnings(*parse_warning_filter(arg, escape=False))

try:
task.execute(**kwargs)
out = task.execute(**kwargs)
except Exception: # noqa: BLE001
exc_info = sys.exc_info()
processed_exc_info = _process_exception(
exc_info, show_locals, console_options
)
else:
if "return" in task.produces:
structure_out = tree_structure(out)
structure_return = tree_structure(task.produces["return"])
# strict must be false when none is leaf.
if not structure_return.is_prefix(structure_out, strict=False):
msg = (
"The structure of the return annotation is not a subtree of "
"the structure of the function return.\n\nFunction return: "
f"{structure_out}\n\nReturn annotation: {structure_return}"
)
raise ValueError(msg)

nodes = tree_leaves(task.produces["return"])
values = structure_return.flatten_up_to(out)
for node, value in zip(nodes, values):
node.save(value) # type: ignore[attr-defined]

processed_exc_info = None

task_display_name = getattr(task, "display_name", task.name)
warning_reports = []
for warning_message in log:
fs_location = warning_message.filename, warning_message.lineno
warning_reports.append(
WarningReport(
message=warning_record_to_str(warning_message),
fs_location=fs_location,
id_=task_short_name,
id_=task_display_name,
)
)

Expand Down Expand Up @@ -293,15 +329,17 @@ def _mock_processes_for_threads(
return [], exc_info


def _create_kwargs_for_task(task: Task) -> dict[Any, Any]:
def _create_kwargs_for_task(task: PTask) -> dict[str, PyTree[Any]]:
"""Create kwargs for task function."""
kwargs = {**task.kwargs}
parameters = inspect.signature(task.function).parameters

kwargs = {}
for name, value in task.depends_on.items():
kwargs[name] = tree_map(lambda x: x.load(), value)

func_arg_names = set(inspect.signature(task.function).parameters)
for arg_name in ("depends_on", "produces"):
if arg_name in func_arg_names:
attribute = getattr(task, arg_name)
kwargs[arg_name] = tree_map(lambda x: x.value, attribute)
for name, value in task.produces.items():
if name in parameters:
kwargs[name] = tree_map(lambda x: x.load(), value)

return kwargs

Expand Down
Loading