Skip to content

Remove hooks related to the DAG. #569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
- {pull}`557` fixes an issue with `@task(after=...)` in notebooks and terminals.
- {pull}`566` makes universal-pathlib an official dependency.
- {pull}`568` restricts `task_files` to a list of patterns and raises a better error.
- {pull}`569` removes the hooks related to the creation of the DAG.

## 0.4.5 - 2024-01-09

Expand Down
19 changes: 0 additions & 19 deletions docs/source/reference_guides/hookspecs.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,6 @@ The following hooks traverse directories and collect tasks from files.
.. autofunction:: pytask_collect_log
```

## Resolving Dependencies

The following hooks are designed to build a DAG from tasks and dependencies and check
which files have changed and need to be re-run.

```{warning}
This step is still experimental and likely to change in the future. If you are planning
to write a plugin which extends pytask in this dimension, please, start a discussion
before writing a plugin. It may make your life easier if changes in pytask anticipate
your plugin.
```

```{eval-rst}
.. autofunction:: pytask_dag
.. autofunction:: pytask_dag_create_dag
.. autofunction:: pytask_dag_log

```

## Execution

The following hooks execute the tasks and log information on the result in the terminal.
Expand Down
3 changes: 2 additions & 1 deletion src/_pytask/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from _pytask.config_utils import find_project_root_and_config
from _pytask.config_utils import read_config
from _pytask.console import console
from _pytask.dag import create_dag
from _pytask.exceptions import CollectionError
from _pytask.exceptions import ConfigurationError
from _pytask.exceptions import ExecutionError
Expand Down Expand Up @@ -265,7 +266,7 @@ def build( # noqa: C901, PLR0912, PLR0913, PLR0915
try:
session.hook.pytask_log_session_header(session=session)
session.hook.pytask_collect(session=session)
session.hook.pytask_dag(session=session)
session.dag = create_dag(session=session)
session.hook.pytask_execute(session=session)

except CollectionError:
Expand Down
3 changes: 2 additions & 1 deletion src/_pytask/collect_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from _pytask.console import create_url_style_for_path
from _pytask.console import format_node_name
from _pytask.console import format_task_name
from _pytask.dag import create_dag
from _pytask.exceptions import CollectionError
from _pytask.exceptions import ConfigurationError
from _pytask.exceptions import ResolvingDependenciesError
Expand Down Expand Up @@ -70,7 +71,7 @@ def collect(**raw_config: Any | None) -> NoReturn:
try:
session.hook.pytask_log_session_header(session=session)
session.hook.pytask_collect(session=session)
session.hook.pytask_dag(session=session)
session.dag = create_dag(session=session)

tasks = _select_tasks_by_expressions_and_marker(session)
task_with_path = [t for t in tasks if isinstance(t, PTaskWithPath)]
Expand Down
36 changes: 15 additions & 21 deletions src/_pytask/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
from _pytask.console import render_to_string
from _pytask.exceptions import ResolvingDependenciesError
from _pytask.mark import select_by_after_keyword
from _pytask.mark import select_tasks_by_marks_and_expressions
from _pytask.node_protocols import PNode
from _pytask.node_protocols import PTask
from _pytask.nodes import PythonNode
from _pytask.pluginmanager import hookimpl
from _pytask.reports import DagReport
from _pytask.shared import reduce_names_of_multiple_nodes
from _pytask.tree_util import tree_map
Expand All @@ -33,28 +33,28 @@
from _pytask.session import Session


@hookimpl
def pytask_dag(session: Session) -> bool | None:
__all__ = ["create_dag"]


def create_dag(session: Session) -> nx.DiGraph:
"""Create a directed acyclic graph (DAG) for the workflow."""
try:
session.dag = session.hook.pytask_dag_create_dag(
session=session, tasks=session.tasks
)
session.hook.pytask_dag_modify_dag(session=session, dag=session.dag)
dag = _create_dag(tasks=session.tasks)
_check_if_dag_has_cycles(dag)
_check_if_tasks_have_the_same_products(dag, session.config["paths"])
_modify_dag(session=session, dag=dag)
select_tasks_by_marks_and_expressions(session=session, dag=dag)

except Exception: # noqa: BLE001
report = DagReport.from_exception(sys.exc_info())
session.hook.pytask_dag_log(session=session, report=report)
_log_dag(report=report)
session.dag_report = report

raise ResolvingDependenciesError from None

else:
return True
return dag


@hookimpl
def pytask_dag_create_dag(session: Session, tasks: list[PTask]) -> nx.DiGraph:
def _create_dag(tasks: list[PTask]) -> nx.DiGraph:
"""Create the DAG from tasks, dependencies and products."""

def _add_dependency(dag: nx.DiGraph, task: PTask, node: PNode) -> None:
Expand Down Expand Up @@ -90,15 +90,10 @@ def _add_product(dag: nx.DiGraph, task: PTask, node: PNode) -> None:
else None,
task.depends_on,
)

_check_if_dag_has_cycles(dag)
_check_if_tasks_have_the_same_products(dag, session.config["paths"])

return dag


@hookimpl
def pytask_dag_modify_dag(session: Session, dag: nx.DiGraph) -> None:
def _modify_dag(session: Session, dag: nx.DiGraph) -> None:
"""Create dependencies between tasks when using ``@task(after=...)``."""
temporary_id_to_task = {
task.attributes["collection_id"]: task
Expand Down Expand Up @@ -194,8 +189,7 @@ def _check_if_tasks_have_the_same_products(dag: nx.DiGraph, paths: list[Path]) -
raise ResolvingDependenciesError(msg)


@hookimpl
def pytask_dag_log(report: DagReport) -> None:
def _log_dag(report: DagReport) -> None:
"""Log errors which happened while resolving dependencies."""
console.print()
console.rule(
Expand Down
5 changes: 3 additions & 2 deletions src/_pytask/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from _pytask.config_utils import find_project_root_and_config
from _pytask.config_utils import read_config
from _pytask.console import console
from _pytask.dag import create_dag
from _pytask.exceptions import CollectionError
from _pytask.exceptions import ConfigurationError
from _pytask.exceptions import ResolvingDependenciesError
Expand Down Expand Up @@ -101,7 +102,7 @@ def dag(**raw_config: Any) -> int:
"can install with conda.",
)
session.hook.pytask_collect(session=session)
session.hook.pytask_dag(session=session)
session.dag = create_dag(session=session)
dag = _refine_dag(session)
_write_graph(dag, session.config["output_path"], session.config["layout"])

Expand Down Expand Up @@ -198,7 +199,7 @@ def build_dag(raw_config: dict[str, Any]) -> nx.DiGraph:
"can install with conda.",
)
session.hook.pytask_collect(session=session)
session.hook.pytask_dag(session=session)
session.dag = create_dag(session=session)
session.hook.pytask_unconfigure(session=session)
dag = _refine_dag(session)

Expand Down
40 changes: 0 additions & 40 deletions src/_pytask/hookspecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from pathlib import Path

import click
import networkx as nx
from pluggy import PluginManager

from _pytask.models import NodeInfo
Expand All @@ -25,7 +24,6 @@
from _pytask.outcomes import CollectionOutcome
from _pytask.outcomes import TaskOutcome
from _pytask.reports import CollectionReport
from _pytask.reports import DagReport
from _pytask.reports import ExecutionReport
from _pytask.session import Session

Expand Down Expand Up @@ -212,44 +210,6 @@ def pytask_collect_log(
"""


# Hooks for resolving dependencies.


@hookspec(firstresult=True)
def pytask_dag(session: Session) -> None:
"""Create a DAG.

The main hook implementation which controls the resolution of dependencies and calls
subordinated hooks.

"""


@hookspec(firstresult=True)
def pytask_dag_create_dag(session: Session, tasks: list[PTask]) -> nx.DiGraph:
"""Create the DAG.

This hook creates the DAG from tasks, dependencies and products. The DAG can be used
by a scheduler to find an execution order.

"""


@hookspec
def pytask_dag_modify_dag(session: Session, dag: nx.DiGraph) -> None:
"""Modify the DAG.

This hook allows to make some changes to the DAG before it is validated and tasks
are selected.

"""


@hookspec
def pytask_dag_log(session: Session, report: DagReport) -> None:
"""Log errors during resolving dependencies."""


# Hooks for running tasks.


Expand Down
4 changes: 2 additions & 2 deletions src/_pytask/mark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"select_by_after_keyword",
"select_by_keyword",
"select_by_mark",
"select_tasks_by_marks_and_expressions",
]


Expand Down Expand Up @@ -234,8 +235,7 @@ def _deselect_others_with_mark(
task.markers.append(mark)


@hookimpl
def pytask_dag_modify_dag(session: Session, dag: nx.DiGraph) -> None:
def select_tasks_by_marks_and_expressions(session: Session, dag: nx.DiGraph) -> None:
"""Modify the tasks which are executed with expressions and markers."""
remaining = select_by_keyword(session, dag)
if remaining is not None:
Expand Down
3 changes: 2 additions & 1 deletion src/_pytask/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from _pytask.click import EnumChoice
from _pytask.console import console
from _pytask.console import format_task_name
from _pytask.dag import create_dag
from _pytask.database_utils import BaseTable
from _pytask.database_utils import DatabaseSession
from _pytask.exceptions import CollectionError
Expand Down Expand Up @@ -128,7 +129,7 @@ def profile(**raw_config: Any) -> NoReturn:
try:
session.hook.pytask_log_session_header(session=session)
session.hook.pytask_collect(session=session)
session.hook.pytask_dag(session=session)
session.dag = create_dag(session=session)

profile: dict[str, dict[str, Any]] = {
task.name: {} for task in session.tasks
Expand Down
8 changes: 3 additions & 5 deletions tests/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@
from pathlib import Path

import pytest
from _pytask.dag import pytask_dag_create_dag
from _pytask.dag import _create_dag
from pytask import ExitCode
from pytask import PathNode
from pytask import Session
from pytask import Task
from pytask import build
from pytask import cli


@pytest.mark.unit()
@pytest.mark.skipif(sys.platform == "win32", reason="Hashes match only on unix.")
def test_pytask_dag_create_dag():
def test_create_dag():
root = Path("src")
task = Task(
base_name="task_dummy",
Expand All @@ -27,8 +26,7 @@ def test_pytask_dag_create_dag():
1: PathNode.from_path(root / "node_2"),
},
)
session = Session.from_config({"paths": (root,)})
dag = pytask_dag_create_dag(session=session, tasks=[task])
dag = _create_dag(tasks=[task])

for signature in (
"90bb899a1b60da28ff70352cfb9f34a8bed485597c7f40eed9bd4c6449147525",
Expand Down