Skip to content

Remove checks for missing root nodes. #480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
- {pull}`477` updates the PyPI action.
- {pull}`478` replaces black with ruff-format.
- {pull}`479` gives skips a higher precedence as an outcome than ancestor failed.
- {pull}`480` removes the check for missing root nodes from the generation of the DAG.
It is delegated to the check during the execution.

## 0.4.1 - 2023-10-11

Expand Down
90 changes: 0 additions & 90 deletions docs/source/reference_guides/hookspecs.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ hooks are allowed to raise exceptions which are handled and stored in a report.

```{eval-rst}
.. autofunction:: pytask_add_hooks

```

## Command Line Interface

```{eval-rst}
.. autofunction:: pytask_extend_command_line_interface

```

## Configuration
Expand All @@ -41,20 +39,9 @@ together.

```{eval-rst}
.. autofunction:: pytask_configure
```

```{eval-rst}
.. autofunction:: pytask_parse_config
```

```{eval-rst}
.. autofunction:: pytask_post_parse

```

```{eval-rst}
.. autofunction:: pytask_unconfigure

```

## Collection
Expand All @@ -63,47 +50,16 @@ The following hooks traverse directories and collect tasks from files.

```{eval-rst}
.. autofunction:: pytask_collect
```

```{eval-rst}
.. autofunction:: pytask_ignore_collect
```

```{eval-rst}
.. autofunction:: pytask_collect_modify_tasks
```

```{eval-rst}
.. autofunction:: pytask_collect_file_protocol
```

```{eval-rst}
.. autofunction:: pytask_collect_file
```

```{eval-rst}
.. autofunction:: pytask_collect_task_protocol
```

```{eval-rst}
.. autofunction:: pytask_collect_task_setup
```

```{eval-rst}
.. autofunction:: pytask_collect_task
```

```{eval-rst}
.. autofunction:: pytask_collect_task_teardown
```

```{eval-rst}
.. autofunction:: pytask_collect_node
```

```{eval-rst}
.. autofunction:: pytask_collect_log

```

## Resolving Dependencies
Expand All @@ -120,21 +76,8 @@ your plugin.

```{eval-rst}
.. autofunction:: pytask_dag
```

```{eval-rst}
.. autofunction:: pytask_dag_create_dag
```

```{eval-rst}
.. autofunction:: pytask_dag_validate_dag
```

```{eval-rst}
.. autofunction:: pytask_dag_select_execution_dag
```

```{eval-rst}
.. autofunction:: pytask_dag_log

```
Expand All @@ -145,48 +88,15 @@ The following hooks execute the tasks and log information on the result in the t

```{eval-rst}
.. autofunction:: pytask_execute
```

```{eval-rst}
.. autofunction:: pytask_execute_log_start
```

```{eval-rst}
.. autofunction:: pytask_execute_create_scheduler
```

```{eval-rst}
.. autofunction:: pytask_execute_build
```

```{eval-rst}
.. autofunction:: pytask_execute_task_protocol
```

```{eval-rst}
.. autofunction:: pytask_execute_task_log_start
```

```{eval-rst}
.. autofunction:: pytask_execute_task_setup
```

```{eval-rst}
.. autofunction:: pytask_execute_task
```

```{eval-rst}
.. autofunction:: pytask_execute_task_teardown
```

```{eval-rst}
.. autofunction:: pytask_execute_task_process_report
```

```{eval-rst}
.. autofunction:: pytask_execute_task_log_end
```

```{eval-rst}
.. autofunction:: pytask_execute_log_end
```
109 changes: 3 additions & 106 deletions src/_pytask/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@

import itertools
import sys
from typing import Sequence
from typing import TYPE_CHECKING

import networkx as nx
from _pytask.config import hookimpl
from _pytask.config import IS_FILE_SYSTEM_CASE_SENSITIVE
from _pytask.console import ARROW_DOWN_ICON
from _pytask.console import console
from _pytask.console import FILE_ICON
Expand All @@ -23,8 +21,6 @@
from _pytask.database_utils import State
from _pytask.exceptions import ResolvingDependenciesError
from _pytask.mark import Mark
from _pytask.mark_utils import get_marks
from _pytask.mark_utils import has_mark
from _pytask.node_protocols import PNode
from _pytask.node_protocols import PTask
from _pytask.nodes import PythonNode
Expand All @@ -48,13 +44,12 @@ def pytask_dag(session: Session) -> bool | None:
session=session, tasks=session.tasks
)
session.hook.pytask_dag_modify_dag(session=session, dag=session.dag)
session.hook.pytask_dag_validate_dag(session=session, dag=session.dag)
session.hook.pytask_dag_select_execution_dag(session=session, dag=session.dag)

except Exception: # noqa: BLE001
report = DagReport.from_exception(sys.exc_info())
session.hook.pytask_dag_log(session=session, report=report)
session.dag_reports = report
session.dag_report = report

raise ResolvingDependenciesError from None

Expand All @@ -63,7 +58,7 @@ def pytask_dag(session: Session) -> bool | None:


@hookimpl
def pytask_dag_create_dag(tasks: list[PTask]) -> nx.DiGraph:
def pytask_dag_create_dag(session: Session, tasks: list[PTask]) -> nx.DiGraph:
"""Create the DAG from tasks, dependencies and products."""

def _add_dependency(dag: nx.DiGraph, task: PTask, node: PNode) -> None:
Expand Down Expand Up @@ -101,6 +96,7 @@ def _add_product(dag: nx.DiGraph, task: PTask, node: PNode) -> None:
)

_check_if_dag_has_cycles(dag)
_check_if_tasks_have_the_same_products(dag, session.config["paths"])

return dag

Expand All @@ -123,13 +119,6 @@ def pytask_dag_select_execution_dag(session: Session, dag: nx.DiGraph) -> None:
)


@hookimpl
def pytask_dag_validate_dag(session: Session, dag: nx.DiGraph) -> None:
"""Validate the DAG."""
_check_if_root_nodes_are_available(dag, session.config["paths"])
_check_if_tasks_have_the_same_products(dag, session.config["paths"])


def _have_task_or_neighbors_changed(
session: Session, dag: nx.DiGraph, task: PTask
) -> bool:
Expand Down Expand Up @@ -198,98 +187,6 @@ def _format_cycles(dag: nx.DiGraph, cycles: list[tuple[str, ...]]) -> str:
return "\n".join(lines[:-1])


_TEMPLATE_ERROR: str = (
"Some dependencies do not exist or are not produced by any task. See the following "
"tree which shows which dependencies are missing for which tasks.\n\n{}"
)
if IS_FILE_SYSTEM_CASE_SENSITIVE:
_TEMPLATE_ERROR += (
"\n\n(Hint: Your file-system is case-sensitive. Check the paths' "
"capitalization carefully.)"
)


def _check_if_root_nodes_are_available(dag: nx.DiGraph, paths: Sequence[Path]) -> None:
__tracebackhide__ = True

missing_root_nodes = []
is_task_skipped: dict[str, bool] = {}

for node in dag.nodes:
is_node = "node" in dag.nodes[node]
is_without_parents = len(list(dag.predecessors(node))) == 0
if is_node and is_without_parents:
are_all_tasks_skipped, is_task_skipped = _check_if_tasks_are_skipped(
node, dag, is_task_skipped
)
if not are_all_tasks_skipped:
try:
node_exists = dag.nodes[node]["node"].state()
except Exception as e: # noqa: BLE001
msg = _format_exception_from_failed_node_state(node, dag, paths)
raise ResolvingDependenciesError(msg) from e
if not node_exists:
missing_root_nodes.append(node)

if missing_root_nodes:
dictionary = {}
for node in missing_root_nodes:
short_node_name = format_node_name(dag.nodes[node]["node"], paths).plain
not_skipped_successors = [
task for task in dag.successors(node) if not is_task_skipped[task]
]
short_successors = reduce_names_of_multiple_nodes(
not_skipped_successors, dag, paths
)
dictionary[short_node_name] = short_successors

text = _format_dictionary_to_tree(dictionary, "Missing dependencies:")
raise ResolvingDependenciesError(_TEMPLATE_ERROR.format(text)) from None


def _format_exception_from_failed_node_state(
node_signature: str, dag: nx.DiGraph, paths: Sequence[Path]
) -> str:
"""Format message when ``node.state()`` threw an exception."""
tasks = [dag.nodes[i]["task"] for i in dag.successors(node_signature)]
names = [task.name for task in tasks]
successors = ", ".join([f"{name!r}" for name in names])
node_name = format_node_name(dag.nodes[node_signature]["node"], paths).plain
return (
f"While checking whether dependency {node_name!r} from task(s) "
f"{successors} exists, an error was raised."
)


def _check_if_tasks_are_skipped(
node: PNode, dag: nx.DiGraph, is_task_skipped: dict[str, bool]
) -> tuple[bool, dict[str, bool]]:
"""Check for a given node whether it is only used by skipped tasks."""
are_all_tasks_skipped = []
for successor in dag.successors(node):
if successor not in is_task_skipped:
is_task_skipped[successor] = _check_if_task_is_skipped(successor, dag)
are_all_tasks_skipped.append(is_task_skipped[successor])

return all(are_all_tasks_skipped), is_task_skipped


def _check_if_task_is_skipped(task_name: str, dag: nx.DiGraph) -> bool:
task = dag.nodes[task_name]["task"]
is_skipped = has_mark(task, "skip")

if is_skipped:
return True

skip_if_markers = get_marks(task, "skipif")
return any(_skipif(*marker.args, **marker.kwargs)[0] for marker in skip_if_markers)


def _skipif(condition: bool, *, reason: str) -> tuple[bool, str]:
"""Shameless copy to circumvent circular imports."""
return condition, reason


def _format_dictionary_to_tree(dict_: dict[str, list[str]], title: str) -> str:
"""Format missing root nodes."""
tree = Tree(title)
Expand Down
7 changes: 1 addition & 6 deletions src/_pytask/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import sys
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING

import click
import networkx as nx
Expand All @@ -31,10 +30,6 @@
from rich.traceback import Traceback


if TYPE_CHECKING:
from typing import NoReturn


class _RankDirection(enum.Enum):
TB = "TB"
LR = "LR"
Expand Down Expand Up @@ -82,7 +77,7 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None:
help=_HELP_TEXT_RANK_DIRECTION,
default=_RankDirection.TB,
)
def dag(**raw_config: Any) -> NoReturn:
def dag(**raw_config: Any) -> int:
"""Create a visualization of the project's directed acyclic graph."""
try:
pm = get_plugin_manager()
Expand Down
9 changes: 8 additions & 1 deletion src/_pytask/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import TYPE_CHECKING

from _pytask.config import hookimpl
from _pytask.config import IS_FILE_SYSTEM_CASE_SENSITIVE
from _pytask.console import console
from _pytask.console import create_summary_panel
from _pytask.console import create_url_style_for_task
Expand Down Expand Up @@ -36,6 +37,7 @@
from _pytask.tree_util import tree_structure
from rich.text import Text


if TYPE_CHECKING:
from _pytask.session import Session

Expand Down Expand Up @@ -125,7 +127,12 @@ def pytask_execute_task_setup(session: Session, task: PTask) -> None:
for dependency in session.dag.predecessors(task.signature):
node = session.dag.nodes[dependency]["node"]
if not node.state():
msg = f"{node.name} is missing and required for {task.name}."
msg = f"{task.name} requires missing node {node.name}."
if IS_FILE_SYSTEM_CASE_SENSITIVE:
msg += (
"\n\n(Hint: Your file-system is case-sensitive. Check the paths' "
"capitalization carefully.)"
)
raise NodeNotFoundError(msg)

# Create directory for product if it does not exist. Maybe this should be a `setup`
Expand Down
Loading