|
| 1 | +"""This file contains the command and code for drawing the DAG.""" |
| 2 | +import shutil |
| 3 | +from pathlib import Path |
| 4 | +from typing import Any |
| 5 | +from typing import Dict |
| 6 | + |
| 7 | +import click |
| 8 | +import networkx as nx |
| 9 | +from _pytask.config import hookimpl |
| 10 | +from _pytask.console import console |
| 11 | +from _pytask.dag import descending_tasks |
| 12 | +from _pytask.enums import ColorCode |
| 13 | +from _pytask.enums import ExitCode |
| 14 | +from _pytask.exceptions import CollectionError |
| 15 | +from _pytask.exceptions import ConfigurationError |
| 16 | +from _pytask.exceptions import ResolvingDependenciesError |
| 17 | +from _pytask.nodes import reduce_names_of_multiple_nodes |
| 18 | +from _pytask.pluginmanager import get_plugin_manager |
| 19 | +from _pytask.session import Session |
| 20 | +from _pytask.shared import get_first_non_none_value |
| 21 | + |
| 22 | + |
| 23 | +@hookimpl(tryfirst=True) |
| 24 | +def pytask_extend_command_line_interface(cli: click.Group): |
| 25 | + """Extend the command line interface.""" |
| 26 | + cli.add_command(dag) |
| 27 | + |
| 28 | + |
| 29 | +@hookimpl |
| 30 | +def pytask_parse_config(config, config_from_cli, config_from_file): |
| 31 | + """Parse configuration.""" |
| 32 | + config["output_path"] = get_first_non_none_value( |
| 33 | + config_from_cli, |
| 34 | + config_from_file, |
| 35 | + key="output_path", |
| 36 | + default=Path.cwd() / "dag.pdf", |
| 37 | + callback=lambda x: None if x is None else Path(x), |
| 38 | + ) |
| 39 | + config["layout"] = get_first_non_none_value( |
| 40 | + config_from_cli, |
| 41 | + config_from_file, |
| 42 | + key="layout", |
| 43 | + default="dot", |
| 44 | + ) |
| 45 | + |
| 46 | + |
| 47 | +_HELP_TEXT_LAYOUT = ( |
| 48 | + "The layout determines the structure of the graph. Here you find an overview of " |
| 49 | + "all available layouts: https://graphviz.org/#roadmap." |
| 50 | +) |
| 51 | + |
| 52 | + |
| 53 | +_HELP_TEXT_OUTPUT = ( |
| 54 | + "The output path of the visualization. The format is inferred from the file " |
| 55 | + "extension." |
| 56 | +) |
| 57 | + |
| 58 | + |
| 59 | +@click.command() |
| 60 | +@click.option("-l", "--layout", type=str, default=None, help=_HELP_TEXT_LAYOUT) |
| 61 | +@click.option("-o", "--output-path", type=str, default=None, help=_HELP_TEXT_OUTPUT) |
| 62 | +def dag(**config_from_cli): |
| 63 | + """Create a visualization of the project's DAG.""" |
| 64 | + session = _create_session(config_from_cli) |
| 65 | + dag = _refine_dag(session) |
| 66 | + _write_graph(dag, session.config["output_path"], session.config["layout"]) |
| 67 | + |
| 68 | + |
| 69 | +def build_dag(config_from_cli: Dict[str, Any]) -> "pydot.Dot": # noqa: F821 |
| 70 | + """Build the DAG. |
| 71 | +
|
| 72 | + This function is the programmatic interface to ``pytask dag`` and returns a |
| 73 | + preprocessed :class:`pydot.Dot` which makes plotting easier than with matplotlib. |
| 74 | +
|
| 75 | + To change the style of the graph, it might be easier to convert the graph back to |
| 76 | + networkx, set attributes, and convert back to pydot or pygraphviz. |
| 77 | +
|
| 78 | + Parameters |
| 79 | + ---------- |
| 80 | + config_from_cli : Dict[str, Any] |
| 81 | + The configuration usually received from the CLI. For example, use ``{"paths": |
| 82 | + "example-directory/"}`` to collect tasks from a directory. |
| 83 | +
|
| 84 | + Returns |
| 85 | + ------- |
| 86 | + pydot.Dot |
| 87 | + A preprocessed graph which can be customized and exported. |
| 88 | +
|
| 89 | + """ |
| 90 | + session = _create_session(config_from_cli) |
| 91 | + dag = _refine_dag(session) |
| 92 | + return dag |
| 93 | + |
| 94 | + |
| 95 | +def _refine_dag(session): |
| 96 | + dag = _shorten_node_labels(session.dag, session.config["paths"]) |
| 97 | + dag = _add_root_node(dag) |
| 98 | + dag = _clean_dag(dag) |
| 99 | + dag = _style_dag(dag) |
| 100 | + dag = _escape_node_names_with_colons(dag) |
| 101 | + |
| 102 | + return dag |
| 103 | + |
| 104 | + |
| 105 | +def _create_session(config_from_cli: Dict[str, Any]) -> nx.DiGraph: |
| 106 | + try: |
| 107 | + pm = get_plugin_manager() |
| 108 | + from _pytask import cli |
| 109 | + |
| 110 | + pm.register(cli) |
| 111 | + pm.hook.pytask_add_hooks(pm=pm) |
| 112 | + |
| 113 | + config = pm.hook.pytask_configure(pm=pm, config_from_cli=config_from_cli) |
| 114 | + |
| 115 | + session = Session.from_config(config) |
| 116 | + |
| 117 | + except (ConfigurationError, Exception): |
| 118 | + console.print_exception() |
| 119 | + session = Session({}, None) |
| 120 | + session.exit_code = ExitCode.CONFIGURATION_FAILED |
| 121 | + |
| 122 | + else: |
| 123 | + try: |
| 124 | + session.hook.pytask_log_session_header(session=session) |
| 125 | + session.hook.pytask_collect(session=session) |
| 126 | + session.hook.pytask_resolve_dependencies(session=session) |
| 127 | + |
| 128 | + except CollectionError: |
| 129 | + session.exit_code = ExitCode.COLLECTION_FAILED |
| 130 | + |
| 131 | + except ResolvingDependenciesError: |
| 132 | + session.exit_code = ExitCode.RESOLVING_DEPENDENCIES_FAILED |
| 133 | + |
| 134 | + except Exception: |
| 135 | + session.exit_code = ExitCode.FAILED |
| 136 | + console.print_exception() |
| 137 | + console.rule(style=ColorCode.FAILED) |
| 138 | + |
| 139 | + return session |
| 140 | + |
| 141 | + |
| 142 | +def _shorten_node_labels(dag, paths): |
| 143 | + node_names = dag.nodes |
| 144 | + short_names = reduce_names_of_multiple_nodes(node_names, dag, paths) |
| 145 | + old_to_new = dict(zip(node_names, short_names)) |
| 146 | + dag = nx.relabel_nodes(dag, old_to_new) |
| 147 | + return dag |
| 148 | + |
| 149 | + |
| 150 | +def _add_root_node(dag): |
| 151 | + tasks_without_predecessor = [ |
| 152 | + name |
| 153 | + for name in dag.nodes |
| 154 | + if len(list(descending_tasks(name, dag))) == 0 and "task" in dag.nodes[name] |
| 155 | + ] |
| 156 | + if tasks_without_predecessor: |
| 157 | + dag.add_node("root") |
| 158 | + for name in tasks_without_predecessor: |
| 159 | + dag.add_edge("root", name) |
| 160 | + |
| 161 | + return dag |
| 162 | + |
| 163 | + |
| 164 | +def _clean_dag(dag): |
| 165 | + """Clean the DAG.""" |
| 166 | + for node in dag.nodes: |
| 167 | + dag.nodes[node].clear() |
| 168 | + return dag |
| 169 | + |
| 170 | + |
| 171 | +def _style_dag(dag: nx.DiGraph) -> nx.DiGraph: |
| 172 | + shapes = {name: "hexagon" if "::task_" in name else "box" for name in dag.nodes} |
| 173 | + nx.set_node_attributes(dag, shapes, "shape") |
| 174 | + return dag |
| 175 | + |
| 176 | + |
| 177 | +def _escape_node_names_with_colons(dag: nx.DiGraph): |
| 178 | + """Escape node names with colons. |
| 179 | +
|
| 180 | + pydot cannot handle colons in node names since it messes up some syntax. Escaping |
| 181 | + works by wrapping the string in double quotes. See this issue for more information: |
| 182 | + https://github.com/pydot/pydot/issues/224. |
| 183 | +
|
| 184 | + """ |
| 185 | + return nx.relabel_nodes(dag, {name: f'"{name}"' for name in dag.nodes}) |
| 186 | + |
| 187 | + |
| 188 | +def _write_graph(dag: nx.DiGraph, path: Path, layout: str) -> None: |
| 189 | + try: |
| 190 | + import pydot # noqa: F401 |
| 191 | + except ImportError: |
| 192 | + raise ImportError( |
| 193 | + "To visualize the project's DAG you need to install pydot which is " |
| 194 | + "available with pip and conda." |
| 195 | + ) from None |
| 196 | + if shutil.which(layout) is None: |
| 197 | + raise RuntimeError( |
| 198 | + "The layout program '{layout}' could not be found on your PATH. Please, " |
| 199 | + "install graphviz. It is, for example, available with conda." |
| 200 | + ) |
| 201 | + |
| 202 | + path.parent.mkdir(exist_ok=True, parents=True) |
| 203 | + graph = nx.nx_pydot.to_pydot(dag) |
| 204 | + graph.write(path, prog=layout, format=path.suffix[1:]) |
0 commit comments