From b3011f119279cfb9175582e5bf83ed7c6a9d7b74 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 7 Apr 2024 19:54:42 +0200 Subject: [PATCH 1/7] Formalize backends. --- docs/source/changes.md | 2 ++ src/pytask_parallel/backends.py | 55 ++++++++++++++++++++++++++++----- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/docs/source/changes.md b/docs/source/changes.md index ec9cb68..1c2f25a 100644 --- a/docs/source/changes.md +++ b/docs/source/changes.md @@ -15,6 +15,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and reports. - {pull}`93` adds documentation on readthedocs. - {pull}`94` implements `ParallelBackend.NONE` as the default backend. +- {pull}`95` formalizes parallel backends and apply wrappers for backends with threads + or processes automatically. ## 0.4.1 - 2024-01-12 diff --git a/src/pytask_parallel/backends.py b/src/pytask_parallel/backends.py index 26f56e2..e45aa63 100644 --- a/src/pytask_parallel/backends.py +++ b/src/pytask_parallel/backends.py @@ -93,23 +93,44 @@ class ParallelBackend(Enum): THREADS = "threads" +class WorkerType(Enum): + """A type for workers that either spawned as threads or processes.""" + + THREADS = "threads" + PROCESSES = "processes" + + +@define +class _ParallelBackend: + builder: Callable[..., Executor] + worker_type: WorkerType + remote: bool + + @define class ParallelBackendRegistry: """Registry for parallel backends.""" - registry: ClassVar[dict[ParallelBackend, Callable[..., Executor]]] = {} + registry: ClassVar[dict[ParallelBackend, _ParallelBackend]] = {} def register_parallel_backend( - self, kind: ParallelBackend, builder: Callable[..., Executor] + self, + kind: ParallelBackend, + builder: Callable[..., Executor], + *, + worker_type: WorkerType | str = WorkerType.PROCESSES, + remote: bool = False, ) -> None: """Register a parallel backend.""" - self.registry[kind] = builder + self.registry[kind] = _ParallelBackend( + builder=builder, worker_type=WorkerType(worker_type), remote=remote + ) def get_parallel_backend(self, kind: ParallelBackend, n_workers: int) -> Executor: """Get a parallel backend.""" __tracebackhide__ = True try: - return self.registry[kind](n_workers=n_workers) + return self.registry[kind].builder(n_workers=n_workers) except KeyError: msg = f"No registered parallel backend found for kind {kind.value!r}." raise ValueError(msg) from None @@ -121,9 +142,27 @@ def get_parallel_backend(self, kind: ParallelBackend, n_workers: int) -> Executo registry = ParallelBackendRegistry() -registry.register_parallel_backend(ParallelBackend.DASK, _get_dask_executor) -registry.register_parallel_backend(ParallelBackend.LOKY, _get_loky_executor) registry.register_parallel_backend( - ParallelBackend.PROCESSES, _get_process_pool_executor + ParallelBackend.DASK, + _get_dask_executor, + worker_type=WorkerType.PROCESSES, + remote=False, +) +registry.register_parallel_backend( + ParallelBackend.LOKY, + _get_loky_executor, + worker_type=WorkerType.PROCESSES, + remote=False, +) +registry.register_parallel_backend( + ParallelBackend.PROCESSES, + _get_process_pool_executor, + worker_type=WorkerType.PROCESSES, + remote=False, +) +registry.register_parallel_backend( + ParallelBackend.THREADS, + _get_thread_pool_executor, + worker_type=WorkerType.THREADS, + remote=False, ) -registry.register_parallel_backend(ParallelBackend.THREADS, _get_thread_pool_executor) From d5132c420da679b1d469094d30af21c4f60a54a6 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 7 Apr 2024 22:19:11 +0200 Subject: [PATCH 2/7] Finish formalization. --- src/pytask_parallel/backends.py | 41 ++-- src/pytask_parallel/config.py | 14 -- src/pytask_parallel/custom.py | 27 --- src/pytask_parallel/dask.py | 208 ------------------ src/pytask_parallel/execute.py | 56 +++++ src/pytask_parallel/threads.py | 59 ----- src/pytask_parallel/utils.py | 31 +++ .../{processes.py => wrappers.py} | 129 +++++------ tests/conftest.py | 8 +- tests/test_backends.py | 36 +-- tests/test_config.py | 14 +- 11 files changed, 172 insertions(+), 451 deletions(-) delete mode 100644 src/pytask_parallel/custom.py delete mode 100644 src/pytask_parallel/dask.py delete mode 100644 src/pytask_parallel/threads.py rename src/pytask_parallel/{processes.py => wrappers.py} (69%) diff --git a/src/pytask_parallel/backends.py b/src/pytask_parallel/backends.py index e45aa63..fc862e5 100644 --- a/src/pytask_parallel/backends.py +++ b/src/pytask_parallel/backends.py @@ -16,7 +16,7 @@ from attrs import define from loky import get_reusable_executor -__all__ = ["ParallelBackend", "ParallelBackendRegistry", "registry"] +__all__ = ["ParallelBackend", "ParallelBackendRegistry", "WorkerType", "registry"] def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any: @@ -138,31 +138,18 @@ def get_parallel_backend(self, kind: ParallelBackend, n_workers: int) -> Executo msg = f"Could not instantiate parallel backend {kind.value!r}." raise ValueError(msg) from e - -registry = ParallelBackendRegistry() + def reset(self) -> None: + """Register the default backends.""" + for parallel_backend, builder, worker_type, remote in ( + (ParallelBackend.DASK, _get_dask_executor, "processes", False), + (ParallelBackend.LOKY, _get_loky_executor, "processes", False), + (ParallelBackend.PROCESSES, _get_process_pool_executor, "processes", False), + (ParallelBackend.THREADS, _get_thread_pool_executor, "threads", False), + ): + self.register_parallel_backend( + parallel_backend, builder, worker_type=worker_type, remote=remote + ) -registry.register_parallel_backend( - ParallelBackend.DASK, - _get_dask_executor, - worker_type=WorkerType.PROCESSES, - remote=False, -) -registry.register_parallel_backend( - ParallelBackend.LOKY, - _get_loky_executor, - worker_type=WorkerType.PROCESSES, - remote=False, -) -registry.register_parallel_backend( - ParallelBackend.PROCESSES, - _get_process_pool_executor, - worker_type=WorkerType.PROCESSES, - remote=False, -) -registry.register_parallel_backend( - ParallelBackend.THREADS, - _get_thread_pool_executor, - worker_type=WorkerType.THREADS, - remote=False, -) +registry = ParallelBackendRegistry() +registry.reset() diff --git a/src/pytask_parallel/config.py b/src/pytask_parallel/config.py index ffd70a6..618cc38 100644 --- a/src/pytask_parallel/config.py +++ b/src/pytask_parallel/config.py @@ -7,12 +7,8 @@ from pytask import hookimpl -from pytask_parallel import custom -from pytask_parallel import dask from pytask_parallel import execute from pytask_parallel import logging -from pytask_parallel import processes -from pytask_parallel import threads from pytask_parallel.backends import ParallelBackend @@ -53,13 +49,3 @@ def pytask_post_parse(config: dict[str, Any]) -> None: # Register parallel execute and logging hook. config["pm"].register(logging) config["pm"].register(execute) - - # Register parallel backends. - if config["parallel_backend"] == ParallelBackend.THREADS: - config["pm"].register(threads) - elif config["parallel_backend"] == ParallelBackend.DASK: - config["pm"].register(dask) - elif config["parallel_backend"] == ParallelBackend.CUSTOM: - config["pm"].register(custom) - else: - config["pm"].register(processes) diff --git a/src/pytask_parallel/custom.py b/src/pytask_parallel/custom.py deleted file mode 100644 index 3ed2377..0000000 --- a/src/pytask_parallel/custom.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Contains functions for the threads backend.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING -from typing import Any - -from pytask import PTask -from pytask import Session -from pytask import hookimpl - -from pytask_parallel.utils import create_kwargs_for_task - -if TYPE_CHECKING: - from concurrent.futures import Future - - -@hookimpl -def pytask_execute_task(session: Session, task: PTask) -> Future[Any]: - """Execute a task. - - Since threads have shared memory, it is not necessary to pickle and unpickle the - task. - - """ - kwargs = create_kwargs_for_task(task) - return session.config["_parallel_executor"].submit(task.function, **kwargs) diff --git a/src/pytask_parallel/dask.py b/src/pytask_parallel/dask.py deleted file mode 100644 index f71cf7d..0000000 --- a/src/pytask_parallel/dask.py +++ /dev/null @@ -1,208 +0,0 @@ -"""Contains functions for the dask backend.""" - -from __future__ import annotations - -import inspect -import sys -import warnings -from contextlib import redirect_stderr -from contextlib import redirect_stdout -from functools import partial -from io import StringIO -from typing import TYPE_CHECKING -from typing import Any -from typing import Callable - -import cloudpickle -from pytask import Mark -from pytask import PTask -from pytask import PythonNode -from pytask import Session -from pytask import Traceback -from pytask import WarningReport -from pytask import console -from pytask import get_marks -from pytask import hookimpl -from pytask import parse_warning_filter -from pytask import warning_record_to_str -from pytask.tree_util import PyTree -from pytask.tree_util import tree_map - -from pytask_parallel.utils import create_kwargs_for_task -from pytask_parallel.utils import handle_task_function_return - -if TYPE_CHECKING: - from concurrent.futures import Future - from pathlib import Path - from types import ModuleType - from types import TracebackType - - from rich.console import ConsoleOptions - - -@hookimpl -def pytask_execute_task(session: Session, task: PTask) -> Future[Any]: - """Execute a task. - - Since threads have shared memory, it is not necessary to pickle and unpickle the - task. - - """ - # Task modules are dynamically loaded and added to `sys.modules`. Thus, cloudpickle - # believes the module of the task function is also importable in the child process. - # We have to register the module as dynamic again, so that cloudpickle will pickle - # it with the function. See cloudpickle#417, pytask#373 and pytask#374. - task_module = _get_module(task.function, getattr(task, "path", None)) - cloudpickle.register_pickle_by_value(task_module) - - kwargs = create_kwargs_for_task(task) - return session.config["_parallel_executor"].submit( - _execute_task, - task=task, - kwargs=kwargs, - show_locals=session.config["show_locals"], - console_options=console.options, - session_filterwarnings=session.config["filterwarnings"], - task_filterwarnings=get_marks(task, "filterwarnings"), - ) - - -def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ARG001 - msg = ( - "You cannot use 'breakpoint()' or 'pdb.set_trace()' while parallelizing the " - "execution of tasks with pytask-parallel. Please, remove the breakpoint or run " - "the task without parallelization to debug it." - ) - raise RuntimeError(msg) - - -def _patch_set_trace_and_breakpoint() -> None: - """Patch :func:`pdb.set_trace` and :func:`breakpoint`. - - Patch sys.breakpointhook to intercept any call of breakpoint() and pdb.set_trace in - a subprocess and print a better exception message. - - """ - import pdb # noqa: T100 - import sys - - pdb.set_trace = _raise_exception_on_breakpoint - sys.breakpointhook = _raise_exception_on_breakpoint - - -def _execute_task( # noqa: PLR0913 - task: PTask, - kwargs: dict[str, Any], - show_locals: bool, # noqa: FBT001 - console_options: ConsoleOptions, - session_filterwarnings: tuple[str, ...], - task_filterwarnings: tuple[Mark, ...], -) -> tuple[ - PyTree[PythonNode | None], - list[WarningReport], - tuple[type[BaseException], BaseException, str] | None, - str, - str, -]: - """Unserialize and execute task. - - This function receives bytes and unpickles them to a task which is them execute in a - spawned process or thread. - - """ - # Hide this function from tracebacks. - __tracebackhide__ = True - - # Patch set_trace and breakpoint to show a better error message. - _patch_set_trace_and_breakpoint() - - captured_stdout_buffer = StringIO() - captured_stderr_buffer = StringIO() - - # Catch warnings and store them in a list. - with warnings.catch_warnings(record=True) as log, redirect_stdout( - captured_stdout_buffer - ), redirect_stderr(captured_stderr_buffer): - # Apply global filterwarnings. - for arg in session_filterwarnings: - warnings.filterwarnings(*parse_warning_filter(arg, escape=False)) - - # Apply filters from "filterwarnings" marks - for mark in task_filterwarnings: - for arg in mark.args: - warnings.filterwarnings(*parse_warning_filter(arg, escape=False)) - - try: - out = task.execute(**kwargs) - except Exception: # noqa: BLE001 - exc_info = sys.exc_info() - processed_exc_info = _process_exception( - exc_info, show_locals, console_options - ) - else: - # Save products. - handle_task_function_return(task, out) - processed_exc_info = None - - task_display_name = getattr(task, "display_name", task.name) - warning_reports = [] - for warning_message in log: - fs_location = warning_message.filename, warning_message.lineno - warning_reports.append( - WarningReport( - message=warning_record_to_str(warning_message), - fs_location=fs_location, - id_=task_display_name, - ) - ) - - captured_stdout_buffer.seek(0) - captured_stderr_buffer.seek(0) - captured_stdout = captured_stdout_buffer.read() - captured_stderr = captured_stderr_buffer.read() - captured_stdout_buffer.close() - captured_stderr_buffer.close() - - # Collect all PythonNodes that are products to pass values back to the main process. - python_nodes = tree_map( - lambda x: x if isinstance(x, PythonNode) else None, task.produces - ) - - return ( - python_nodes, - warning_reports, - processed_exc_info, - captured_stdout, - captured_stderr, - ) - - -def _process_exception( - exc_info: tuple[type[BaseException], BaseException, TracebackType | None], - show_locals: bool, # noqa: FBT001 - console_options: ConsoleOptions, -) -> tuple[type[BaseException], BaseException, str]: - """Process the exception and convert the traceback to a string.""" - traceback = Traceback(exc_info, show_locals=show_locals) - segments = console.render(traceback, options=console_options) - text = "".join(segment.text for segment in segments) - return (*exc_info[:2], text) - - -def _get_module(func: Callable[..., Any], path: Path | None) -> ModuleType: - """Get the module of a python function. - - ``functools.partial`` obfuscates the module of the function and - ``inspect.getmodule`` returns :mod`functools`. Therefore, we recover the original - function. - - We use the path from the task module to aid the search although it is not clear - whether it helps. - - """ - if isinstance(func, partial): - func = func.func - - if path: - return inspect.getmodule(func, path.as_posix()) - return inspect.getmodule(func) diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index e4ce8e0..e2deab0 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING from typing import Any +import cloudpickle from attrs import define from attrs import field from pytask import ExecutionReport @@ -14,12 +15,17 @@ from pytask import PTask from pytask import PythonNode from pytask import Session +from pytask import console +from pytask import get_marks from pytask import hookimpl from pytask.tree_util import PyTree from pytask.tree_util import tree_map from pytask.tree_util import tree_structure +from pytask_parallel.backends import WorkerType from pytask_parallel.backends import registry +from pytask_parallel.utils import create_kwargs_for_task +from pytask_parallel.utils import get_module from pytask_parallel.utils import parse_future_result if TYPE_CHECKING: @@ -156,6 +162,56 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 return True +@hookimpl +def pytask_execute_task(session: Session, task: PTask) -> Future[Any]: + """Execute a task. + + The task function is wrapped according to the worker type and submitted to the + executor. + + """ + worker_type = registry.registry[session.config["parallel_backend"]].worker_type + + kwargs = create_kwargs_for_task(task) + + if worker_type == WorkerType.PROCESSES: + # Prevent circular import for loky backend. + from pytask_parallel.wrappers import wrap_task_in_process + + # Task modules are dynamically loaded and added to `sys.modules`. Thus, + # cloudpickle believes the module of the task function is also importable in the + # child process. We have to register the module as dynamic again, so that + # cloudpickle will pickle it with the function. See cloudpickle#417, pytask#373 + # and pytask#374. + task_module = get_module(task.function, getattr(task, "path", None)) + cloudpickle.register_pickle_by_value(task_module) + + return session.config["_parallel_executor"].submit( + wrap_task_in_process, + task=task, + kwargs=kwargs, + show_locals=session.config["show_locals"], + console_options=console.options, + session_filterwarnings=session.config["filterwarnings"], + task_filterwarnings=get_marks(task, "filterwarnings"), + ) + if worker_type == WorkerType.THREADS: + # Prevent circular import for loky backend. + from pytask_parallel.wrappers import wrap_task_in_thread + + return session.config["_parallel_executor"].submit( + wrap_task_in_thread, task=task, **kwargs + ) + msg = f"Unknown worker type {worker_type}" + raise ValueError(msg) + + +@hookimpl +def pytask_unconfigure() -> None: + """Clean up the parallel executor.""" + registry.reset() + + def _update_python_nodes( task: PTask, python_nodes: dict[str, PyTree[PythonNode | None]] | None ) -> None: diff --git a/src/pytask_parallel/threads.py b/src/pytask_parallel/threads.py deleted file mode 100644 index 936d804..0000000 --- a/src/pytask_parallel/threads.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Contains functions for the threads backend.""" - -from __future__ import annotations - -import sys -from typing import TYPE_CHECKING -from typing import Any - -from pytask import PTask -from pytask import Session -from pytask import hookimpl - -from pytask_parallel.utils import create_kwargs_for_task -from pytask_parallel.utils import handle_task_function_return - -if TYPE_CHECKING: - from concurrent.futures import Future - from types import TracebackType - - -@hookimpl -def pytask_execute_task(session: Session, task: PTask) -> Future[Any]: - """Execute a task. - - Since threads have shared memory, it is not necessary to pickle and unpickle the - task. - - """ - kwargs = create_kwargs_for_task(task) - return session.config["_parallel_executor"].submit( - _mock_processes_for_threads, task=task, **kwargs - ) - - -def _mock_processes_for_threads( - task: PTask, **kwargs: Any -) -> tuple[ - None, - list[Any], - tuple[type[BaseException], BaseException, TracebackType] | None, - str, - str, -]: - """Mock execution function such that it returns the same as for processes. - - The function for processes returns ``warning_reports`` and an ``exception``. With - threads, these object are collected by the main and not the subprocess. So, we just - return placeholders. - - """ - __tracebackhide__ = True - try: - out = task.function(**kwargs) - except Exception: # noqa: BLE001 - exc_info = sys.exc_info() - else: - handle_task_function_return(task, out) - exc_info = None - return None, [], exc_info, "", "" diff --git a/src/pytask_parallel/utils.py b/src/pytask_parallel/utils.py index 06acbe8..268cfc7 100644 --- a/src/pytask_parallel/utils.py +++ b/src/pytask_parallel/utils.py @@ -3,8 +3,10 @@ from __future__ import annotations import inspect +from functools import partial from typing import TYPE_CHECKING from typing import Any +from typing import Callable from pytask.tree_util import PyTree from pytask.tree_util import tree_leaves @@ -13,6 +15,8 @@ if TYPE_CHECKING: from concurrent.futures import Future + from pathlib import Path + from types import ModuleType from types import TracebackType from pytask import PTask @@ -20,6 +24,14 @@ from pytask import WarningReport +__all__ = [ + "create_kwargs_for_task", + "get_module", + "handle_task_function_return", + "parse_future_result", +] + + def parse_future_result( future: Future[Any], ) -> tuple[ @@ -94,3 +106,22 @@ def _parse_future_exception( ) -> tuple[type[BaseException], BaseException, TracebackType] | None: """Parse a future exception into the format of ``sys.exc_info``.""" return None if exc is None else (type(exc), exc, exc.__traceback__) + + +def get_module(func: Callable[..., Any], path: Path | None) -> ModuleType: + """Get the module of a python function. + + ``functools.partial`` obfuscates the module of the function and + ``inspect.getmodule`` returns :mod`functools`. Therefore, we recover the original + function. + + We use the path from the task module to aid the search although it is not clear + whether it helps. + + """ + if isinstance(func, partial): + func = func.func + + if path: + return inspect.getmodule(func, path.as_posix()) + return inspect.getmodule(func) diff --git a/src/pytask_parallel/processes.py b/src/pytask_parallel/wrappers.py similarity index 69% rename from src/pytask_parallel/processes.py rename to src/pytask_parallel/wrappers.py index c00aa2e..ff75aee 100644 --- a/src/pytask_parallel/processes.py +++ b/src/pytask_parallel/wrappers.py @@ -2,95 +2,64 @@ from __future__ import annotations -import inspect import sys import warnings from contextlib import redirect_stderr from contextlib import redirect_stdout -from functools import partial from io import StringIO from typing import TYPE_CHECKING from typing import Any -from typing import Callable -import cloudpickle -from pytask import Mark -from pytask import PTask from pytask import PythonNode -from pytask import Session from pytask import Traceback from pytask import WarningReport from pytask import console -from pytask import get_marks -from pytask import hookimpl from pytask import parse_warning_filter from pytask import warning_record_to_str from pytask.tree_util import PyTree from pytask.tree_util import tree_map -from pytask_parallel.utils import create_kwargs_for_task from pytask_parallel.utils import handle_task_function_return if TYPE_CHECKING: - from concurrent.futures import Future - from pathlib import Path - from types import ModuleType from types import TracebackType + from pytask import Mark + from pytask import PTask from rich.console import ConsoleOptions -@hookimpl -def pytask_execute_task(session: Session, task: PTask) -> Future[Any]: - """Execute a task. +__all__ = ["wrap_task_in_process", "wrap_task_in_thread"] - Take a task, pickle it and send the bytes over to another process. - """ - kwargs = create_kwargs_for_task(task) - - # Task modules are dynamically loaded and added to `sys.modules`. Thus, cloudpickle - # believes the module of the task function is also importable in the child process. - # We have to register the module as dynamic again, so that cloudpickle will pickle - # it with the function. See cloudpickle#417, pytask#373 and pytask#374. - task_module = _get_module(task.function, getattr(task, "path", None)) - cloudpickle.register_pickle_by_value(task_module) - - return session.config["_parallel_executor"].submit( - _execute_task, - task=task, - kwargs=kwargs, - show_locals=session.config["show_locals"], - console_options=console.options, - session_filterwarnings=session.config["filterwarnings"], - task_filterwarnings=get_marks(task, "filterwarnings"), - ) - - -def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ARG001 - msg = ( - "You cannot use 'breakpoint()' or 'pdb.set_trace()' while parallelizing the " - "execution of tasks with pytask-parallel. Please, remove the breakpoint or run " - "the task without parallelization to debug it." - ) - raise RuntimeError(msg) - - -def _patch_set_trace_and_breakpoint() -> None: - """Patch :func:`pdb.set_trace` and :func:`breakpoint`. +def wrap_task_in_thread( + task: PTask, **kwargs: Any +) -> tuple[ + None, + list[Any], + tuple[type[BaseException], BaseException, TracebackType] | None, + str, + str, +]: + """Mock execution function such that it returns the same as for processes. - Patch sys.breakpointhook to intercept any call of breakpoint() and pdb.set_trace in - a subprocess and print a better exception message. + The function for processes returns ``warning_reports`` and an ``exception``. With + threads, these object are collected by the main and not the subprocess. So, we just + return placeholders. """ - import pdb # noqa: T100 - import sys - - pdb.set_trace = _raise_exception_on_breakpoint - sys.breakpointhook = _raise_exception_on_breakpoint + __tracebackhide__ = True + try: + out = task.function(**kwargs) + except Exception: # noqa: BLE001 + exc_info = sys.exc_info() + else: + handle_task_function_return(task, out) + exc_info = None + return None, [], exc_info, "", "" -def _execute_task( # noqa: PLR0913 +def wrap_task_in_process( # noqa: PLR0913 task: PTask, kwargs: dict[str, Any], show_locals: bool, # noqa: FBT001 @@ -136,7 +105,7 @@ def _execute_task( # noqa: PLR0913 out = task.execute(**kwargs) except Exception: # noqa: BLE001 exc_info = sys.exc_info() - processed_exc_info = _process_exception( + processed_exc_info = _render_traceback_to_string( exc_info, show_locals, console_options ) else: @@ -177,7 +146,30 @@ def _execute_task( # noqa: PLR0913 ) -def _process_exception( +def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ARG001 + msg = ( + "You cannot use 'breakpoint()' or 'pdb.set_trace()' while parallelizing the " + "execution of tasks with pytask-parallel. Please, remove the breakpoint or run " + "the task without parallelization to debug it." + ) + raise RuntimeError(msg) + + +def _patch_set_trace_and_breakpoint() -> None: + """Patch :func:`pdb.set_trace` and :func:`breakpoint`. + + Patch sys.breakpointhook to intercept any call of breakpoint() and pdb.set_trace in + a subprocess and print a better exception message. + + """ + import pdb # noqa: T100 + import sys + + pdb.set_trace = _raise_exception_on_breakpoint + sys.breakpointhook = _raise_exception_on_breakpoint + + +def _render_traceback_to_string( exc_info: tuple[type[BaseException], BaseException, TracebackType | None], show_locals: bool, # noqa: FBT001 console_options: ConsoleOptions, @@ -187,22 +179,3 @@ def _process_exception( segments = console.render(traceback, options=console_options) text = "".join(segment.text for segment in segments) return (*exc_info[:2], text) - - -def _get_module(func: Callable[..., Any], path: Path | None) -> ModuleType: - """Get the module of a python function. - - ``functools.partial`` obfuscates the module of the function and - ``inspect.getmodule`` returns :mod`functools`. Therefore, we recover the original - function. - - We use the path from the task module to aid the search although it is not clear - whether it helps. - - """ - if isinstance(func, partial): - func = func.func - - if path: - return inspect.getmodule(func, path.as_posix()) - return inspect.getmodule(func) diff --git a/tests/conftest.py b/tests/conftest.py index cedd5f8..57004c3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,5 @@ from __future__ import annotations -import os import sys from contextlib import contextmanager from typing import Callable @@ -77,7 +76,6 @@ def runner(): def pytest_collection_modifyitems(session, config, items) -> None: # noqa: ARG001 """Add markers to Jupyter notebook tests.""" - if sys.platform == "darwin" and "CI" in os.environ: # pragma: no cover - for item in items: - if isinstance(item, NotebookItem): - item.add_marker(pytest.mark.xfail(reason="Fails regularly on MacOS")) + for item in items: + if isinstance(item, NotebookItem): + item.add_marker(pytest.mark.xfail(reason="The tests are flaky.")) diff --git a/tests/test_backends.py b/tests/test_backends.py index 92ecc6b..61168ae 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -34,45 +34,23 @@ def task_example(): pass @pytest.mark.end_to_end() def test_register_custom_backend(runner, tmp_path): - hook_source = """ - import cloudpickle + source = """ from loky import get_reusable_executor - from pytask import hookimpl from pytask_parallel import ParallelBackend from pytask_parallel import registry - from pytask_parallel.processes import _get_module - from pytask_parallel.utils import create_kwargs_for_task - - - @hookimpl(tryfirst=True) - def pytask_execute_task(session, task): - kwargs = create_kwargs_for_task(task) - - task_module = _get_module(task.function, getattr(task, "path", None)) - cloudpickle.register_pickle_by_value(task_module) - - return session.config["_parallel_executor"].submit(task.function, **kwargs) - def custom_builder(n_workers): print("Build custom executor.") return get_reusable_executor(max_workers=n_workers) + registry.register_parallel_backend( + ParallelBackend.CUSTOM, custom_builder, worker_type="processes", remote=False + ) - registry.register_parallel_backend(ParallelBackend.CUSTOM, custom_builder) + def task_example(): pass """ - tmp_path.joinpath("hook.py").write_text(textwrap.dedent(hook_source)) - tmp_path.joinpath("task_example.py").write_text("def task_example(): pass") - result = runner.invoke( - cli, - [ - tmp_path.as_posix(), - "--parallel-backend", - "custom", - "--hook-module", - tmp_path.joinpath("hook.py").as_posix(), - ], - ) + tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source)) + result = runner.invoke(cli, [tmp_path.as_posix(), "--parallel-backend", "custom"]) assert result.exit_code == ExitCode.OK assert "Build custom executor." in result.output assert "1 Succeeded" in result.output diff --git a/tests/test_config.py b/tests/test_config.py index 9e2ce2c..e9e261b 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -36,14 +36,20 @@ def test_interplay_between_debugging_and_parallel(tmp_path, pdb, n_workers, expe ] + [ ("parallel_backend", parallel_backend, ExitCode.OK) - if parallel_backend != ParallelBackend.DASK - else pytest.param( + for parallel_backend in ( + ParallelBackend.LOKY, + ParallelBackend.PROCESSES, + ParallelBackend.THREADS, + ) + ] + + [ + pytest.param( "parallel_backend", "dask", ExitCode.CONFIGURATION_FAILED, marks=pytest.mark.skip(reason="Dask is not yet supported"), - ) - for parallel_backend in ParallelBackend + ), + ("parallel_backend", ParallelBackend.CUSTOM, ExitCode.FAILED), ], ) def test_reading_values_from_config_file( From a4744cbd42f837a70c15782697747b4b9f12e4fd Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 7 Apr 2024 22:30:23 +0200 Subject: [PATCH 3/7] Fix. --- src/pytask_parallel/backends.py | 1 + tests/test_backends.py | 4 ++++ tests/test_config.py | 2 +- tests/test_execute.py | 2 +- 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/backends.py b/src/pytask_parallel/backends.py index fc862e5..e64489a 100644 --- a/src/pytask_parallel/backends.py +++ b/src/pytask_parallel/backends.py @@ -140,6 +140,7 @@ def get_parallel_backend(self, kind: ParallelBackend, n_workers: int) -> Executo def reset(self) -> None: """Register the default backends.""" + self.registry.clear() for parallel_backend, builder, worker_type, remote in ( (ParallelBackend.DASK, _get_dask_executor, "processes", False), (ParallelBackend.LOKY, _get_loky_executor, "processes", False), diff --git a/tests/test_backends.py b/tests/test_backends.py index 61168ae..ba771aa 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -1,5 +1,6 @@ import textwrap +from pytask_parallel import ParallelBackend, registry import pytest from pytask import ExitCode from pytask import cli @@ -54,3 +55,6 @@ def task_example(): pass assert result.exit_code == ExitCode.OK assert "Build custom executor." in result.output assert "1 Succeeded" in result.output + + # Assert that the backend registry has been reset. + assert ParallelBackend.CUSTOM not in registry.registry diff --git a/tests/test_config.py b/tests/test_config.py index e9e261b..5918045 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -6,7 +6,7 @@ import pytest from pytask import ExitCode from pytask import build -from pytask_parallel.backends import ParallelBackend +from pytask_parallel import ParallelBackend @pytest.mark.end_to_end() diff --git a/tests/test_execute.py b/tests/test_execute.py index 275e502..516b35a 100644 --- a/tests/test_execute.py +++ b/tests/test_execute.py @@ -7,7 +7,7 @@ from pytask import ExitCode from pytask import build from pytask import cli -from pytask_parallel.backends import ParallelBackend +from pytask_parallel import ParallelBackend from pytask_parallel.execute import _Sleeper from tests.conftest import restore_sys_path_and_module_after_test_execution From 6a727d3732cf03b9ce90817c091b2f44afc6c4fe Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 7 Apr 2024 20:30:47 +0000 Subject: [PATCH 4/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_backends.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_backends.py b/tests/test_backends.py index ba771aa..0c50da2 100644 --- a/tests/test_backends.py +++ b/tests/test_backends.py @@ -1,9 +1,10 @@ import textwrap -from pytask_parallel import ParallelBackend, registry import pytest from pytask import ExitCode from pytask import cli +from pytask_parallel import ParallelBackend +from pytask_parallel import registry @pytest.mark.end_to_end() From 1f7d7263dc6080822dcf20576ad4f0a4625786e0 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 7 Apr 2024 23:34:38 +0200 Subject: [PATCH 5/7] Formalize wrapper result. --- .pre-commit-config.yaml | 2 +- pyproject.toml | 57 ++++++++++++++----------- src/pytask_parallel/execute.py | 35 ++++++++------- src/pytask_parallel/utils.py | 69 ++++++------------------------ src/pytask_parallel/wrappers.py | 76 +++++++++++++++++++++------------ 5 files changed, 112 insertions(+), 127 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 986cb85..acca674 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -82,7 +82,7 @@ repos: hooks: - id: check-manifest args: [--no-build-isolation] - additional_dependencies: [setuptools-scm, toml, wheel] + additional_dependencies: [hatchling, hatch-vcs] - repo: meta hooks: - id: check-hooks-apply diff --git a/pyproject.toml b/pyproject.toml index d28f4ea..f3ca37a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,3 @@ -[build-system] -build-backend = "setuptools.build_meta" -requires = ["setuptools>=64", "setuptools_scm[toml]>=8"] - [project] name = "pytask_parallel" description = "Parallelize the execution of tasks with pytask." @@ -20,7 +16,7 @@ dependencies = [ "loky", "pluggy>=1.0.0", "pytask>=0.4.5", - "rich" + "rich", ] dynamic = ["version"] @@ -31,17 +27,17 @@ email = "raabe@posteo.de" [project.optional-dependencies] dask = ["dask[complete]", "distributed"] docs = [ - "furo", - "ipython", - "matplotlib", - "myst-parser", - "nbsphinx", - "sphinx", - "sphinx-click", - "sphinx-copybutton", - "sphinx-design>=0.3", - "sphinx-toolbox", - "sphinxext-opengraph", + "furo", + "ipython", + "matplotlib", + "myst-parser", + "nbsphinx", + "sphinx", + "sphinx-click", + "sphinx-copybutton", + "sphinx-design>=0.3", + "sphinx-toolbox", + "sphinxext-opengraph", ] test = [ "pytask-parallel[all]", @@ -76,15 +72,28 @@ ignore = ["src/pytask_parallel/_version.py"] [project.entry-points.pytask] pytask_parallel = "pytask_parallel.plugin" -[tool.setuptools.package-dir] -"" = "src" +[build-system] +requires = ["hatchling", "hatch_vcs"] +build-backend = "hatchling.build" + +[tool.rye] +managed = true + +[tool.rye.scripts] +clean-docs = { cmd = "rm -rf docs/build" } +build-docs = { cmd = "sphinx-build -b html docs/source docs/build" } + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.hatch.build.hooks.vcs] +version-file = "src/pytask_parallel/_version.py" -[tool.setuptools.packages.find] -where = ["src"] -namespaces = false +[tool.hatch.build.targets.wheel] +packages = ["src/pytask_parallel"] -[tool.setuptools_scm] -version_file = "src/pytask_parallel/_version.py" +[tool.hatch.version] +source = "vcs" [tool.mypy] files = ["src", "tests"] @@ -108,9 +117,7 @@ unsafe-fixes = true [tool.ruff.lint] extend-ignore = [ - # Others. "ANN101", # type annotating self - "ANN102", # type annotating cls "ANN401", # flake8-annotate typing.Any "COM812", # Comply with ruff-format. "ISC001", # Comply with ruff-format. diff --git a/src/pytask_parallel/execute.py b/src/pytask_parallel/execute.py index e2deab0..cf880d3 100644 --- a/src/pytask_parallel/execute.py +++ b/src/pytask_parallel/execute.py @@ -31,6 +31,8 @@ if TYPE_CHECKING: from concurrent.futures import Future + from pytask_parallel.wrappers import WrapperResult + @hookimpl def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR0915 @@ -97,34 +99,31 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 future = running_tasks[task_name] if future.done(): - ( - python_nodes, - warnings_reports, - exc_info, - captured_stdout, - captured_stderr, - ) = parse_future_result(future) - session.warnings.extend(warnings_reports) - - if captured_stdout: + wrapper_result = parse_future_result(future) + session.warnings.extend(wrapper_result.warning_reports) + + if wrapper_result.stdout: task.report_sections.append( - ("call", "stdout", captured_stdout) + ("call", "stdout", wrapper_result.stdout) ) - if captured_stderr: + if wrapper_result.stderr: task.report_sections.append( - ("call", "stderr", captured_stderr) + ("call", "stderr", wrapper_result.stderr) ) - if exc_info is not None: + if wrapper_result.exc_info is not None: task = session.dag.nodes[task_name]["task"] newly_collected_reports.append( - ExecutionReport.from_task_and_exception(task, exc_info) + ExecutionReport.from_task_and_exception( + task, + wrapper_result.exc_info, # type: ignore[arg-type] + ) ) running_tasks.pop(task_name) session.scheduler.done(task_name) else: task = session.dag.nodes[task_name]["task"] - _update_python_nodes(task, python_nodes) + _update_python_nodes(task, wrapper_result.python_nodes) try: session.hook.pytask_execute_task_teardown( @@ -163,7 +162,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091 @hookimpl -def pytask_execute_task(session: Session, task: PTask) -> Future[Any]: +def pytask_execute_task(session: Session, task: PTask) -> Future[WrapperResult]: """Execute a task. The task function is wrapped according to the worker type and submitted to the @@ -213,7 +212,7 @@ def pytask_unconfigure() -> None: def _update_python_nodes( - task: PTask, python_nodes: dict[str, PyTree[PythonNode | None]] | None + task: PTask, python_nodes: PyTree[PythonNode | None] | None ) -> None: """Update the python nodes of a task with the python nodes from the future.""" diff --git a/src/pytask_parallel/utils.py b/src/pytask_parallel/utils.py index 268cfc7..7460c87 100644 --- a/src/pytask_parallel/utils.py +++ b/src/pytask_parallel/utils.py @@ -9,81 +9,40 @@ from typing import Callable from pytask.tree_util import PyTree -from pytask.tree_util import tree_leaves from pytask.tree_util import tree_map -from pytask.tree_util import tree_structure + if TYPE_CHECKING: + from pytask_parallel.wrappers import WrapperResult from concurrent.futures import Future from pathlib import Path from types import ModuleType from types import TracebackType from pytask import PTask - from pytask import PythonNode - from pytask import WarningReport -__all__ = [ - "create_kwargs_for_task", - "get_module", - "handle_task_function_return", - "parse_future_result", -] +__all__ = ["create_kwargs_for_task", "get_module", "parse_future_result"] def parse_future_result( - future: Future[Any], -) -> tuple[ - dict[str, PyTree[PythonNode | None]] | None, - list[WarningReport], - tuple[type[BaseException], BaseException, TracebackType] | None, - str, - str, -]: + future: Future[WrapperResult], +) -> WrapperResult: """Parse the result of a future.""" # An exception was raised before the task was executed. future_exception = future.exception() if future_exception is not None: + from pytask_parallel.wrappers import WrapperResult + exc_info = _parse_future_exception(future_exception) - return None, [], exc_info, "", "" - - out = future.result() - if isinstance(out, tuple) and len(out) == 5: # noqa: PLR2004 - return out - - if out is None: - return None, [], None, "", "" - - # What to do when the output does not match? - msg = ( - "The task function returns an unknown output format. Either return a tuple " - "with three elements, python nodes, warning reports and exception or only " - "return." - ) - raise Exception(msg) # noqa: TRY002 - - -def handle_task_function_return(task: PTask, out: Any) -> None: - """Handle the return value of a task function.""" - if "return" not in task.produces: - return - - structure_out = tree_structure(out) - structure_return = tree_structure(task.produces["return"]) - # strict must be false when none is leaf. - if not structure_return.is_prefix(structure_out, strict=False): - msg = ( - "The structure of the return annotation is not a subtree of " - "the structure of the function return.\n\nFunction return: " - f"{structure_out}\n\nReturn annotation: {structure_return}" + return WrapperResult( + python_nodes=None, + warning_reports=[], + exc_info=exc_info, + stdout="", + stderr="", ) - raise ValueError(msg) - - nodes = tree_leaves(task.produces["return"]) - values = structure_return.flatten_up_to(out) - for node, value in zip(nodes, values): - node.save(value) + return future.result() def create_kwargs_for_task(task: PTask) -> dict[str, PyTree[Any]]: diff --git a/src/pytask_parallel/wrappers.py b/src/pytask_parallel/wrappers.py index ff75aee..3e2f42f 100644 --- a/src/pytask_parallel/wrappers.py +++ b/src/pytask_parallel/wrappers.py @@ -10,6 +10,8 @@ from typing import TYPE_CHECKING from typing import Any +from attrs import define +from pytask import PTask from pytask import PythonNode from pytask import Traceback from pytask import WarningReport @@ -17,30 +19,52 @@ from pytask import parse_warning_filter from pytask import warning_record_to_str from pytask.tree_util import PyTree +from pytask.tree_util import tree_leaves from pytask.tree_util import tree_map - -from pytask_parallel.utils import handle_task_function_return +from pytask.tree_util import tree_structure if TYPE_CHECKING: from types import TracebackType from pytask import Mark - from pytask import PTask from rich.console import ConsoleOptions __all__ = ["wrap_task_in_process", "wrap_task_in_thread"] -def wrap_task_in_thread( - task: PTask, **kwargs: Any -) -> tuple[ - None, - list[Any], - tuple[type[BaseException], BaseException, TracebackType] | None, - str, - str, -]: +@define(kw_only=True) +class WrapperResult: + python_nodes: PyTree[PythonNode | None] + warning_reports: list[WarningReport] + exc_info: tuple[type[BaseException], BaseException, TracebackType | str] | None + stdout: str + stderr: str + + +def _handle_task_function_return(task: PTask, out: Any) -> None: + """Handle the return value of a task function.""" + if "return" not in task.produces: + return + + structure_out = tree_structure(out) + structure_return = tree_structure(task.produces["return"]) + # strict must be false when none is leaf. + if not structure_return.is_prefix(structure_out, strict=False): + msg = ( + "The structure of the return annotation is not a subtree of " + "the structure of the function return.\n\nFunction return: " + f"{structure_out}\n\nReturn annotation: {structure_return}" + ) + raise ValueError(msg) + + nodes = tree_leaves(task.produces["return"]) + values = structure_return.flatten_up_to(out) + for node, value in zip(nodes, values): + node.save(value) + + +def wrap_task_in_thread(task: PTask, **kwargs: Any) -> WrapperResult: """Mock execution function such that it returns the same as for processes. The function for processes returns ``warning_reports`` and an ``exception``. With @@ -54,9 +78,11 @@ def wrap_task_in_thread( except Exception: # noqa: BLE001 exc_info = sys.exc_info() else: - handle_task_function_return(task, out) + _handle_task_function_return(task, out) exc_info = None - return None, [], exc_info, "", "" + return WrapperResult( + python_nodes=None, warning_reports=[], exc_info=exc_info, stdout="", stderr="" + ) def wrap_task_in_process( # noqa: PLR0913 @@ -66,13 +92,7 @@ def wrap_task_in_process( # noqa: PLR0913 console_options: ConsoleOptions, session_filterwarnings: tuple[str, ...], task_filterwarnings: tuple[Mark, ...], -) -> tuple[ - PyTree[PythonNode | None], - list[WarningReport], - tuple[type[BaseException], BaseException, str] | None, - str, - str, -]: +) -> WrapperResult: """Unserialize and execute task. This function receives bytes and unpickles them to a task which is them execute in a @@ -110,7 +130,7 @@ def wrap_task_in_process( # noqa: PLR0913 ) else: # Save products. - handle_task_function_return(task, out) + _handle_task_function_return(task, out) processed_exc_info = None task_display_name = getattr(task, "display_name", task.name) @@ -137,12 +157,12 @@ def wrap_task_in_process( # noqa: PLR0913 lambda x: x if isinstance(x, PythonNode) else None, task.produces ) - return ( - python_nodes, - warning_reports, - processed_exc_info, - captured_stdout, - captured_stderr, + return WrapperResult( + python_nodes=python_nodes, + warning_reports=warning_reports, + exc_info=processed_exc_info, + stdout=captured_stdout, + stderr=captured_stderr, ) From 76403d16acda6ff13a50e8e894e1a69227c4dadc Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 7 Apr 2024 23:34:49 +0200 Subject: [PATCH 6/7] Formalize wrapper result. --- src/pytask_parallel/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pytask_parallel/utils.py b/src/pytask_parallel/utils.py index 7460c87..b7c3764 100644 --- a/src/pytask_parallel/utils.py +++ b/src/pytask_parallel/utils.py @@ -11,9 +11,7 @@ from pytask.tree_util import PyTree from pytask.tree_util import tree_map - if TYPE_CHECKING: - from pytask_parallel.wrappers import WrapperResult from concurrent.futures import Future from pathlib import Path from types import ModuleType @@ -21,6 +19,8 @@ from pytask import PTask + from pytask_parallel.wrappers import WrapperResult + __all__ = ["create_kwargs_for_task", "get_module", "parse_future_result"] From 4b66ae1d5aceaca4d4065148e2899f343e939825 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 7 Apr 2024 23:48:58 +0200 Subject: [PATCH 7/7] remove rye. --- .pre-commit-config.yaml | 2 +- pyproject.toml | 57 ++++++++++++++++++----------------------- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index acca674..986cb85 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -82,7 +82,7 @@ repos: hooks: - id: check-manifest args: [--no-build-isolation] - additional_dependencies: [hatchling, hatch-vcs] + additional_dependencies: [setuptools-scm, toml, wheel] - repo: meta hooks: - id: check-hooks-apply diff --git a/pyproject.toml b/pyproject.toml index f3ca37a..d28f4ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,7 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = ["setuptools>=64", "setuptools_scm[toml]>=8"] + [project] name = "pytask_parallel" description = "Parallelize the execution of tasks with pytask." @@ -16,7 +20,7 @@ dependencies = [ "loky", "pluggy>=1.0.0", "pytask>=0.4.5", - "rich", + "rich" ] dynamic = ["version"] @@ -27,17 +31,17 @@ email = "raabe@posteo.de" [project.optional-dependencies] dask = ["dask[complete]", "distributed"] docs = [ - "furo", - "ipython", - "matplotlib", - "myst-parser", - "nbsphinx", - "sphinx", - "sphinx-click", - "sphinx-copybutton", - "sphinx-design>=0.3", - "sphinx-toolbox", - "sphinxext-opengraph", + "furo", + "ipython", + "matplotlib", + "myst-parser", + "nbsphinx", + "sphinx", + "sphinx-click", + "sphinx-copybutton", + "sphinx-design>=0.3", + "sphinx-toolbox", + "sphinxext-opengraph", ] test = [ "pytask-parallel[all]", @@ -72,28 +76,15 @@ ignore = ["src/pytask_parallel/_version.py"] [project.entry-points.pytask] pytask_parallel = "pytask_parallel.plugin" -[build-system] -requires = ["hatchling", "hatch_vcs"] -build-backend = "hatchling.build" - -[tool.rye] -managed = true - -[tool.rye.scripts] -clean-docs = { cmd = "rm -rf docs/build" } -build-docs = { cmd = "sphinx-build -b html docs/source docs/build" } - -[tool.hatch.metadata] -allow-direct-references = true - -[tool.hatch.build.hooks.vcs] -version-file = "src/pytask_parallel/_version.py" +[tool.setuptools.package-dir] +"" = "src" -[tool.hatch.build.targets.wheel] -packages = ["src/pytask_parallel"] +[tool.setuptools.packages.find] +where = ["src"] +namespaces = false -[tool.hatch.version] -source = "vcs" +[tool.setuptools_scm] +version_file = "src/pytask_parallel/_version.py" [tool.mypy] files = ["src", "tests"] @@ -117,7 +108,9 @@ unsafe-fixes = true [tool.ruff.lint] extend-ignore = [ + # Others. "ANN101", # type annotating self + "ANN102", # type annotating cls "ANN401", # flake8-annotate typing.Any "COM812", # Comply with ruff-format. "ISC001", # Comply with ruff-format.