Skip to content

Add tests for Jupyter and fix parallelization with PythonNodes. #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:

- name: Run unit tests and doctests.
shell: bash -l {0}
run: tox -e pytest -- tests -m "unit or (not integration and not end_to_end)" --cov=./ --cov-report=xml -n auto
run: tox -e test -- tests -m "unit or (not integration and not end_to_end)" --cov=./ --cov-report=xml

- name: Upload coverage report for unit tests and doctests.
if: runner.os == 'Linux' && matrix.python-version == '3.10'
Expand All @@ -51,7 +51,7 @@ jobs:

- name: Run end-to-end tests.
shell: bash -l {0}
run: tox -e pytest -- tests -m end_to_end --cov=./ --cov-report=xml -n auto
run: tox -e test -- tests -m end_to_end --cov=./ --cov-report=xml

- name: Upload coverage reports of end-to-end tests.
if: runner.os == 'Linux' && matrix.python-version == '3.10'
Expand Down
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ __pycache__

*.egg-info

.pytask.sqlite3

.pytask
build
dist
src/pytask_parallel/_version.py
4 changes: 4 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ repos:
rev: v1.25.0
hooks:
- id: refurb
- repo: https://github.com/kynan/nbstripout
rev: 0.6.1
hooks:
- id: nbstripout
- repo: https://github.com/executablebooks/mdformat
rev: 0.7.17
hooks:
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and

- {pull}`72` moves the project to `pyproject.toml`.
- {pull}`75` updates the release strategy.
- {pull}`79` add tests for Jupyter and fix parallelization with `PythonNode`s.

## 0.4.0 - 2023-10-07

Expand Down
4 changes: 3 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ dependencies:
- optree

# Misc
- black
- tox
- ipywidgets
- nbmake
- pre-commit
- pytest-cov

Expand Down
11 changes: 9 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[build-system]
requires = ["setuptools>=45", "wheel", "setuptools_scm[toml]>=6.0"]
build-backend = "setuptools.build_meta"
requires = ["setuptools>=64", "setuptools_scm[toml]>=8"]

[project]
name = "pytask_parallel"
Expand Down Expand Up @@ -28,6 +28,13 @@ dynamic = ["version"]
name = "Tobias Raabe"
email = "[email protected]"

[project.optional-dependencies]
test = [
"nbmake",
"pytest",
"pytest-cov",
]

[project.readme]
file = "README.md"
content-type = "text/markdown"
Expand Down Expand Up @@ -62,7 +69,7 @@ where = ["src"]
namespaces = false

[tool.setuptools_scm]
write_to = "src/pytask_parallel/_version.py"
version_file = "src/pytask_parallel/_version.py"

[tool.mypy]
files = ["src", "tests"]
Expand Down
54 changes: 44 additions & 10 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from pytask import hookimpl
from pytask import Mark
from pytask import parse_warning_filter
from pytask import PNode
from pytask import PTask
from pytask import PythonNode
from pytask import remove_internal_traceback_frames_from_exc_info
from pytask import Session
from pytask import Task
Expand Down Expand Up @@ -114,7 +116,11 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
warning_reports = []
# A task raised an exception.
else:
warning_reports, task_exception = future.result()
(
python_nodes,
warning_reports,
task_exception,
) = future.result()
session.warnings.extend(warning_reports)
exc_info = (
_parse_future_exception(future.exception())
Expand All @@ -132,6 +138,19 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
session.scheduler.done(task_name)
else:
task = session.dag.nodes[task_name]["task"]

# Update PythonNodes with the values from the future if
# not threads.
if (
session.config["parallel_backend"]
!= ParallelBackend.THREADS
):
task.produces = tree_map(
_update_python_node,
task.produces,
python_nodes,
)

try:
session.hook.pytask_execute_task_teardown(
session=session, task=task
Expand Down Expand Up @@ -169,6 +188,12 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
return None


def _update_python_node(x: PNode, y: PythonNode | None) -> PNode:
if y:
x.save(y.load())
return x


def _parse_future_exception(
exc: BaseException | None,
) -> tuple[type[BaseException], BaseException, TracebackType] | None:
Expand Down Expand Up @@ -240,7 +265,11 @@ def _execute_task( # noqa: PLR0913
console_options: ConsoleOptions,
session_filterwarnings: tuple[str, ...],
task_filterwarnings: tuple[Mark, ...],
) -> tuple[list[WarningReport], tuple[type[BaseException], BaseException, str] | None]:
) -> tuple[
PyTree[PythonNode | None],
list[WarningReport],
tuple[type[BaseException], BaseException, str] | None,
]:
"""Unserialize and execute task.

This function receives bytes and unpickles them to a task which is them execute in a
Expand All @@ -251,9 +280,6 @@ def _execute_task( # noqa: PLR0913
_patch_set_trace_and_breakpoint()

with warnings.catch_warnings(record=True) as log:
# mypy can't infer that record=True means log is not None; help it.
assert log is not None # noqa: S101

for arg in session_filterwarnings:
warnings.filterwarnings(*parse_warning_filter(arg, escape=False))

Expand Down Expand Up @@ -301,7 +327,11 @@ def _execute_task( # noqa: PLR0913
)
)

return warning_reports, processed_exc_info
python_nodes = tree_map(
lambda x: x if isinstance(x, PythonNode) else None, task.produces
)

return python_nodes, warning_reports, processed_exc_info


def _process_exception(
Expand Down Expand Up @@ -339,7 +369,9 @@ def pytask_execute_task(session: Session, task: Task) -> Future[Any] | None:

def _mock_processes_for_threads(
func: Callable[..., Any], **kwargs: Any
) -> tuple[list[Any], tuple[type[BaseException], BaseException, TracebackType] | None]:
) -> tuple[
None, list[Any], tuple[type[BaseException], BaseException, TracebackType] | None
]:
"""Mock execution function such that it returns the same as for processes.

The function for processes returns ``warning_reports`` and an ``exception``. With
Expand All @@ -354,7 +386,7 @@ def _mock_processes_for_threads(
exc_info = sys.exc_info()
else:
exc_info = None
return [], exc_info
return None, [], exc_info


def _create_kwargs_for_task(task: PTask) -> dict[str, PyTree[Any]]:
Expand Down Expand Up @@ -395,7 +427,7 @@ def sleep(self) -> None:
time.sleep(self.timings[self.timing_idx])


def _get_module(func: Callable[..., Any], path: Path) -> ModuleType:
def _get_module(func: Callable[..., Any], path: Path | None) -> ModuleType:
"""Get the module of a python function.

For Python <3.10, functools.partial does not set a `__module__` attribute which is
Expand All @@ -410,4 +442,6 @@ def _get_module(func: Callable[..., Any], path: Path) -> ModuleType:
does not really support ``functools.partial``. Instead, use ``@task(kwargs=...)``.

"""
return inspect.getmodule(func, path.as_posix())
if path:
return inspect.getmodule(func, path.as_posix())
return inspect.getmodule(func)
72 changes: 72 additions & 0 deletions tests/test_jupyter/test_functional_interface.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "12bc75b1",
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"\n",
"from typing_extensions import Annotated\n",
"\n",
"import pytask\n",
"from pytask import ExitCode, PathNode, PythonNode"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "29ac7311",
"metadata": {},
"outputs": [],
"source": [
"node_text = PythonNode(name=\"text\", hash=True)\n",
"\n",
"\n",
"def create_text() -> Annotated[int, node_text]:\n",
" return \"This is the text.\"\n",
"\n",
"\n",
"node_file = PathNode.from_path(Path(\"file.txt\").resolve())\n",
"\n",
"\n",
"def create_file(text: Annotated[int, node_text]) -> Annotated[str, node_file]:\n",
" return text"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "738c9418",
"metadata": {},
"outputs": [],
"source": [
"session = pytask.build(tasks=[create_file, create_text], n_workers=2)\n",
"assert session.exit_code == ExitCode.OK"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
72 changes: 72 additions & 0 deletions tests/test_jupyter/test_functional_interface_w_relative_path.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "12bc75b1",
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"\n",
"from typing_extensions import Annotated\n",
"\n",
"import pytask\n",
"from pytask import ExitCode, PathNode, PythonNode"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "29ac7311",
"metadata": {},
"outputs": [],
"source": [
"node_text = PythonNode(name=\"text\", hash=True)\n",
"\n",
"\n",
"def create_text() -> Annotated[int, node_text]:\n",
" return \"This is the text.\"\n",
"\n",
"\n",
"node_file = PathNode(name=\"product\", path=Path(\"file.txt\"))\n",
"\n",
"\n",
"def create_file(text: Annotated[int, node_text]) -> Annotated[str, node_file]:\n",
" return text"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "738c9418",
"metadata": {},
"outputs": [],
"source": [
"session = pytask.build(tasks=[create_file, create_text], n_workers=2)\n",
"assert session.exit_code == ExitCode.OK"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.6"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
22 changes: 6 additions & 16 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
[tox]
envlist = pytest
requires = tox>=4
envlist = test

[testenv]
usedevelop = true
package = wheel

[testenv:pytest]
deps =
# pytest
pytest
pytest-cov
pytest-xdist
setuptools_scm
toml

# Package
pytask >=0.4.0
cloudpickle
loky
[testenv:test]
extras = test
commands =
pytest {posargs}
pytest --nbmake {posargs}