diff --git a/.ci_support/environment-mpich.yml b/.ci_support/environment-mpich.yml index 0dfa83f1..b1afac43 100644 --- a/.ci_support/environment-mpich.yml +++ b/.ci_support/environment-mpich.yml @@ -10,3 +10,7 @@ dependencies: - pyzmq =26.0.3 - h5py =3.11.0 - h5io =0.2.3 +- matplotlib =3.8.4 +- networkx =3.3 +- pygraphviz =1.13 +- ipython =8.25.0 diff --git a/.ci_support/environment-old.yml b/.ci_support/environment-old.yml index 9f077f21..9a0c84c3 100644 --- a/.ci_support/environment-old.yml +++ b/.ci_support/environment-old.yml @@ -9,4 +9,8 @@ dependencies: - tqdm =4.44.0 - pyzmq =25.0.0 - h5py =3.6.0 -- h5io =0.2.1 \ No newline at end of file +- h5io =0.2.1 +- matplotlib =3.5.3 +- networkx =2.8.8 +- ipython =7.33.0 +- pygraphviz =1.10 diff --git a/.ci_support/environment-openmpi.yml b/.ci_support/environment-openmpi.yml index 2f860fe9..5ef6b4fd 100644 --- a/.ci_support/environment-openmpi.yml +++ b/.ci_support/environment-openmpi.yml @@ -10,3 +10,7 @@ dependencies: - pyzmq =26.0.3 - h5py =3.11.0 - h5io =0.2.3 +- matplotlib =3.8.4 +- networkx =3.3 +- pygraphviz =1.13 +- ipython =8.25.0 diff --git a/.ci_support/environment-win.yml b/.ci_support/environment-win.yml index a9015a80..ee933bae 100644 --- a/.ci_support/environment-win.yml +++ b/.ci_support/environment-win.yml @@ -9,4 +9,8 @@ dependencies: - tqdm =4.66.4 - pyzmq =26.0.3 - h5py =3.11.0 -- h5io =0.2.3 \ No newline at end of file +- h5io =0.2.3 +- matplotlib =3.8.4 +- networkx =3.3 +- pygraphviz =1.13 +- ipython =8.25.0 \ No newline at end of file diff --git a/.github/workflows/unittest-mpich.yml b/.github/workflows/unittest-mpich.yml index fef312d3..ababf55f 100644 --- a/.github/workflows/unittest-mpich.yml +++ b/.github/workflows/unittest-mpich.yml @@ -34,11 +34,6 @@ jobs: label: linux-64-py-3-10-mpich prefix: /usr/share/miniconda3/envs/my-env - - operating-system: ubuntu-latest - python-version: 3.9 - label: linux-64-py-3-9-mpich - prefix: /usr/share/miniconda3/envs/my-env - steps: - uses: actions/checkout@v2 - uses: conda-incubator/setup-miniconda@v2.2.0 diff --git a/.github/workflows/unittest-openmpi.yml b/.github/workflows/unittest-openmpi.yml index a658ade2..89000613 100644 --- a/.github/workflows/unittest-openmpi.yml +++ b/.github/workflows/unittest-openmpi.yml @@ -34,11 +34,6 @@ jobs: label: linux-64-py-3-10-openmpi prefix: /usr/share/miniconda3/envs/my-env - - operating-system: ubuntu-latest - python-version: 3.9 - label: linux-64-py-3-9-openmpi - prefix: /usr/share/miniconda3/envs/my-env - steps: - uses: actions/checkout@v2 - uses: conda-incubator/setup-miniconda@v2.2.0 diff --git a/pympipool/__init__.py b/pympipool/__init__.py index 920b728a..34eb88a2 100644 --- a/pympipool/__init__.py +++ b/pympipool/__init__.py @@ -4,7 +4,10 @@ from pympipool.shell.executor import SubprocessExecutor from pympipool.shell.interactive import ShellExecutor from pympipool.interactive.dependencies import ExecutorWithDependencies -from pympipool.shared.inputcheck import check_refresh_rate as _check_refresh_rate +from pympipool.shared.inputcheck import ( + check_refresh_rate as _check_refresh_rate, + check_plot_dependency_graph as _check_plot_dependency_graph, +) __version__ = get_versions()["version"] @@ -57,6 +60,10 @@ class Executor: init_function (None): optional function to preset arguments for functions which are submitted later command_line_argument_lst (list): Additional command line arguments for the srun call (SLURM only) pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. Examples: ``` @@ -97,6 +104,7 @@ def __init__( pmi: Optional[str] = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, ): # Use __new__() instead of __init__(). This function is only implemented to enable auto-completion. pass @@ -119,6 +127,7 @@ def __new__( pmi: Optional[str] = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, ): """ Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor, @@ -157,6 +166,8 @@ def __new__( pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) disable_dependencies (boolean): Disable resolving future objects during the submission. refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. """ if not disable_dependencies: @@ -176,8 +187,10 @@ def __new__( command_line_argument_lst=command_line_argument_lst, pmi=pmi, refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, ) else: + _check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) _check_refresh_rate(refresh_rate=refresh_rate) return create_executor( max_workers=max_workers, diff --git a/pympipool/interactive/dependencies.py b/pympipool/interactive/dependencies.py index 44220591..51831086 100644 --- a/pympipool/interactive/dependencies.py +++ b/pympipool/interactive/dependencies.py @@ -1,10 +1,19 @@ +from concurrent.futures import Future + from pympipool.interactive import create_executor from pympipool.shared.executor import ExecutorSteps, execute_tasks_with_dependencies from pympipool.shared.thread import RaisingThread +from pympipool.shared.plot import generate_task_hash, generate_nodes_and_edges, draw class ExecutorWithDependencies(ExecutorSteps): - def __init__(self, *args, refresh_rate: float = 0.01, **kwargs): + def __init__( + self, + *args, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + **kwargs, + ): super().__init__() executor = create_executor(*args, **kwargs) self._set_process( @@ -19,3 +28,40 @@ def __init__(self, *args, refresh_rate: float = 0.01, **kwargs): }, ) ) + self._future_hash_dict = {} + self._task_hash_dict = {} + self._generate_dependency_graph = plot_dependency_graph + + def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): + if not self._generate_dependency_graph: + f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs) + else: + f = Future() + f.set_result(None) + task_dict = { + "fn": fn, + "args": args, + "kwargs": kwargs, + "future": f, + "resource_dict": resource_dict, + } + task_hash = generate_task_hash( + task_dict=task_dict, + future_hash_inverse_dict={ + v: k for k, v in self._future_hash_dict.items() + }, + ) + self._future_hash_dict[task_hash] = f + self._task_hash_dict[task_hash] = task_dict + return f + + def __exit__(self, exc_type, exc_val, exc_tb): + super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb) + if self._generate_dependency_graph: + node_lst, edge_lst = generate_nodes_and_edges( + task_hash_dict=self._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in self._future_hash_dict.items() + }, + ) + return draw(node_lst=node_lst, edge_lst=edge_lst) diff --git a/pympipool/shared/inputcheck.py b/pympipool/shared/inputcheck.py index c19c076f..c5c391c3 100644 --- a/pympipool/shared/inputcheck.py +++ b/pympipool/shared/inputcheck.py @@ -67,6 +67,13 @@ def check_refresh_rate(refresh_rate: float): ) +def check_plot_dependency_graph(plot_dependency_graph: bool): + if plot_dependency_graph: + raise ValueError( + "The plot_dependency_graph parameter is only used when disable_dependencies=False." + ) + + def validate_backend( backend: str, flux_installed: bool = False, slurm_installed: bool = False ) -> str: diff --git a/pympipool/shared/plot.py b/pympipool/shared/plot.py new file mode 100644 index 00000000..8c9ccaa6 --- /dev/null +++ b/pympipool/shared/plot.py @@ -0,0 +1,73 @@ +from concurrent.futures import Future +from typing import Tuple + +import cloudpickle + + +def generate_nodes_and_edges( + task_hash_dict: dict, future_hash_inverse_dict: dict +) -> Tuple[list]: + node_lst, edge_lst = [], [] + hash_id_dict = {} + for k, v in task_hash_dict.items(): + hash_id_dict[k] = len(node_lst) + node_lst.append({"name": v["fn"].__name__, "id": hash_id_dict[k]}) + for k, task_dict in task_hash_dict.items(): + for arg in task_dict["args"]: + if not isinstance(arg, Future): + node_id = len(node_lst) + node_lst.append({"name": str(arg), "id": node_id}) + edge_lst.append({"start": node_id, "end": hash_id_dict[k], "label": ""}) + else: + edge_lst.append( + { + "start": hash_id_dict[future_hash_inverse_dict[arg]], + "end": hash_id_dict[k], + "label": "", + } + ) + for kw, v in task_dict["kwargs"].items(): + if not isinstance(v, Future): + node_id = len(node_lst) + node_lst.append({"name": str(v), "id": node_id}) + edge_lst.append( + {"start": node_id, "end": hash_id_dict[k], "label": str(kw)} + ) + else: + edge_lst.append( + { + "start": hash_id_dict[future_hash_inverse_dict[v]], + "end": hash_id_dict[k], + "label": str(kw), + } + ) + return node_lst, edge_lst + + +def generate_task_hash(task_dict: dict, future_hash_inverse_dict: dict) -> bytes: + args_for_hash = [ + arg if not isinstance(arg, Future) else future_hash_inverse_dict[arg] + for arg in task_dict["args"] + ] + kwargs_for_hash = { + k: v if not isinstance(v, Future) else future_hash_inverse_dict[v] + for k, v in task_dict["kwargs"].items() + } + return cloudpickle.dumps( + {"fn": task_dict["fn"], "args": args_for_hash, "kwargs": kwargs_for_hash} + ) + + +def draw(node_lst: list, edge_lst: list): + from IPython.display import SVG, display # noqa + import matplotlib.pyplot as plt # noqa + import networkx as nx # noqa + + graph = nx.DiGraph() + for node in node_lst: + graph.add_node(node["id"], label=node["name"]) + for edge in edge_lst: + graph.add_edge(edge["start"], edge["end"], label=edge["label"]) + svg = nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg") + display(SVG(svg)) + plt.show() diff --git a/pyproject.toml b/pyproject.toml index 120a7cb5..d7cab3a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,12 @@ hdf = [ "h5py==3.11.0", "h5io==0.2.3", ] +graph = [ + "pygraphviz==1.13", + "matplotlib==3.8.4", + "networkx==3.3", + "ipython==8.25.0", +] [tool.setuptools.packages.find] include = ["pympipool*"] diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index d0b8d495..833a1425 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -8,6 +8,15 @@ from pympipool.interactive import create_executor from pympipool.shared.thread import RaisingThread from pympipool.shared.executor import execute_tasks_with_dependencies +from pympipool.shared.plot import generate_nodes_and_edges + + +try: + import pygraphviz + + skip_graphviz_test = False +except ImportError: + skip_graphviz_test = True def add_function(parameter_1, parameter_2): @@ -23,6 +32,33 @@ def test_executor(self): future_2 = exe.submit(add_function, 1, parameter_2=future_1) self.assertEqual(future_2.result(), 4) + @unittest.skipIf( + skip_graphviz_test, + "graphviz is not installed, so the plot_dependency_graph test is skipped.", + ) + def test_executor_dependency_plot(self): + with Executor( + max_cores=1, + backend="local", + hostname_localhost=True, + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_function, 1, parameter_2=2) + future_2 = exe.submit(add_function, 1, parameter_2=future_1) + self.assertTrue(future_1.done()) + self.assertTrue(future_2.done()) + self.assertEqual(len(exe._future_hash_dict), 2) + self.assertEqual(len(exe._task_hash_dict), 2) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 5) + self.assertEqual(len(edges), 4) + def test_dependency_steps(self): cloudpickle_register(ind=1) fs1 = Future()