Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .ci_support/environment-mpich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ dependencies:
- pyzmq =26.0.3
- h5py =3.11.0
- h5io =0.2.3
- matplotlib =3.8.4
- networkx =3.3
- pygraphviz =1.13
- ipython =8.25.0
6 changes: 5 additions & 1 deletion .ci_support/environment-old.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ dependencies:
- tqdm =4.44.0
- pyzmq =25.0.0
- h5py =3.6.0
- h5io =0.2.1
- h5io =0.2.1
- matplotlib =3.5.3
- networkx =2.8.8
- ipython =7.33.0
- pygraphviz =1.10
4 changes: 4 additions & 0 deletions .ci_support/environment-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ dependencies:
- pyzmq =26.0.3
- h5py =3.11.0
- h5io =0.2.3
- matplotlib =3.8.4
- networkx =3.3
- pygraphviz =1.13
- ipython =8.25.0
6 changes: 5 additions & 1 deletion .ci_support/environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ dependencies:
- tqdm =4.66.4
- pyzmq =26.0.3
- h5py =3.11.0
- h5io =0.2.3
- h5io =0.2.3
- matplotlib =3.8.4
- networkx =3.3
- pygraphviz =1.13
- ipython =8.25.0
5 changes: 0 additions & 5 deletions .github/workflows/unittest-mpich.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ jobs:
label: linux-64-py-3-10-mpich
prefix: /usr/share/miniconda3/envs/my-env

- operating-system: ubuntu-latest
python-version: 3.9
label: linux-64-py-3-9-mpich
prefix: /usr/share/miniconda3/envs/my-env

steps:
- uses: actions/checkout@v2
- uses: conda-incubator/setup-miniconda@v2.2.0
Expand Down
5 changes: 0 additions & 5 deletions .github/workflows/unittest-openmpi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ jobs:
label: linux-64-py-3-10-openmpi
prefix: /usr/share/miniconda3/envs/my-env

- operating-system: ubuntu-latest
python-version: 3.9
label: linux-64-py-3-9-openmpi
prefix: /usr/share/miniconda3/envs/my-env

steps:
- uses: actions/checkout@v2
- uses: conda-incubator/setup-miniconda@v2.2.0
Expand Down
15 changes: 14 additions & 1 deletion pympipool/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from pympipool.shell.executor import SubprocessExecutor
from pympipool.shell.interactive import ShellExecutor
from pympipool.interactive.dependencies import ExecutorWithDependencies
from pympipool.shared.inputcheck import check_refresh_rate as _check_refresh_rate
from pympipool.shared.inputcheck import (
check_refresh_rate as _check_refresh_rate,
check_plot_dependency_graph as _check_plot_dependency_graph,
)


__version__ = get_versions()["version"]
Expand Down Expand Up @@ -57,6 +60,10 @@ class Executor:
init_function (None): optional function to preset arguments for functions which are submitted later
command_line_argument_lst (list): Additional command line arguments for the srun call (SLURM only)
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
disable_dependencies (boolean): Disable resolving future objects during the submission.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.

Examples:
```
Expand Down Expand Up @@ -97,6 +104,7 @@ def __init__(
pmi: Optional[str] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
):
# Use __new__() instead of __init__(). This function is only implemented to enable auto-completion.
pass
Expand All @@ -119,6 +127,7 @@ def __new__(
pmi: Optional[str] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
):
"""
Instead of returning a pympipool.Executor object this function returns either a pympipool.mpi.PyMPIExecutor,
Expand Down Expand Up @@ -157,6 +166,8 @@ def __new__(
pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
disable_dependencies (boolean): Disable resolving future objects during the submission.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.

"""
if not disable_dependencies:
Expand All @@ -176,8 +187,10 @@ def __new__(
command_line_argument_lst=command_line_argument_lst,
pmi=pmi,
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
)
else:
_check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph)
_check_refresh_rate(refresh_rate=refresh_rate)
return create_executor(
max_workers=max_workers,
Expand Down
48 changes: 47 additions & 1 deletion pympipool/interactive/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from concurrent.futures import Future

from pympipool.interactive import create_executor
from pympipool.shared.executor import ExecutorSteps, execute_tasks_with_dependencies
from pympipool.shared.thread import RaisingThread
from pympipool.shared.plot import generate_task_hash, generate_nodes_and_edges, draw


class ExecutorWithDependencies(ExecutorSteps):
def __init__(self, *args, refresh_rate: float = 0.01, **kwargs):
def __init__(
self,
*args,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
**kwargs,
):
super().__init__()
executor = create_executor(*args, **kwargs)
self._set_process(
Expand All @@ -19,3 +28,40 @@ def __init__(self, *args, refresh_rate: float = 0.01, **kwargs):
},
)
)
self._future_hash_dict = {}
self._task_hash_dict = {}
self._generate_dependency_graph = plot_dependency_graph

def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
if not self._generate_dependency_graph:
f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
else:
f = Future()
f.set_result(None)
task_dict = {
"fn": fn,
"args": args,
"kwargs": kwargs,
"future": f,
"resource_dict": resource_dict,
}
task_hash = generate_task_hash(
task_dict=task_dict,
future_hash_inverse_dict={
v: k for k, v in self._future_hash_dict.items()
},
)
self._future_hash_dict[task_hash] = f
self._task_hash_dict[task_hash] = task_dict
return f

Comment on lines +35 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The submit method has been adapted to handle task graph generation when enabled. However, using a mutable default argument (resource_dict) is a common Python pitfall, which could lead to unexpected behavior if the dictionary is modified. This should be corrected as indicated by the static analysis tool.

- def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
+ def submit(self, fn: callable, *args, resource_dict: dict = None, **kwargs):
+     if resource_dict is None:
+         resource_dict = {}
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs):
if not self._generate_dependency_graph:
f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
else:
f = Future()
f.set_result(None)
task_dict = {
"fn": fn,
"args": args,
"kwargs": kwargs,
"future": f,
"resource_dict": resource_dict,
}
task_hash = generate_task_hash(
task_dict=task_dict,
future_hash_inverse_dict={
v: k for k, v in self._future_hash_dict.items()
},
)
self._future_hash_dict[task_hash] = f
self._task_hash_dict[task_hash] = task_dict
return f
def submit(self, fn: callable, *args, resource_dict: dict = None, **kwargs):
if resource_dict is None:
resource_dict = {}
if not self._generate_dependency_graph:
f = super().submit(fn, *args, resource_dict=resource_dict, **kwargs)
else:
f = Future()
f.set_result(None)
task_dict = {
"fn": fn,
"args": args,
"kwargs": kwargs,
"future": f,
"resource_dict": resource_dict,
}
task_hash = generate_task_hash(
task_dict=task_dict,
future_hash_inverse_dict={
v: k for k, v in self._future_hash_dict.items()
},
)
self._future_hash_dict[task_hash] = f
self._task_hash_dict[task_hash] = task_dict
return f
Tools
Ruff

35-35: Do not use mutable data structures for argument defaults (B006)

Replace with None; initialize within function

def __exit__(self, exc_type, exc_val, exc_tb):
super().__exit__(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
if self._generate_dependency_graph:
node_lst, edge_lst = generate_nodes_and_edges(
task_hash_dict=self._task_hash_dict,
future_hash_inverse_dict={
v: k for k, v in self._future_hash_dict.items()
},
)
return draw(node_lst=node_lst, edge_lst=edge_lst)
7 changes: 7 additions & 0 deletions pympipool/shared/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ def check_refresh_rate(refresh_rate: float):
)


def check_plot_dependency_graph(plot_dependency_graph: bool):
if plot_dependency_graph:
raise ValueError(
"The plot_dependency_graph parameter is only used when disable_dependencies=False."
)
Comment on lines +70 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure correct usage of plot_dependency_graph parameter.
The exception message for plot_dependency_graph appears to be incorrect. It suggests that the parameter is only used when disable_dependencies=False, but this parameter should be usable regardless of the disable_dependencies setting, as it's meant for debugging and visualizing task dependencies.

- "The plot_dependency_graph parameter is only used when disable_dependencies=False."
+ "The plot_dependency_graph parameter should be set to True to enable task dependency visualization."
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def check_plot_dependency_graph(plot_dependency_graph: bool):
if plot_dependency_graph:
raise ValueError(
"The plot_dependency_graph parameter is only used when disable_dependencies=False."
)
def check_plot_dependency_graph(plot_dependency_graph: bool):
if plot_dependency_graph:
raise ValueError(
"The plot_dependency_graph parameter should be set to True to enable task dependency visualization."
)



def validate_backend(
backend: str, flux_installed: bool = False, slurm_installed: bool = False
) -> str:
Expand Down
73 changes: 73 additions & 0 deletions pympipool/shared/plot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from concurrent.futures import Future
from typing import Tuple

import cloudpickle


def generate_nodes_and_edges(
task_hash_dict: dict, future_hash_inverse_dict: dict
) -> Tuple[list]:
node_lst, edge_lst = [], []
hash_id_dict = {}
for k, v in task_hash_dict.items():
hash_id_dict[k] = len(node_lst)
node_lst.append({"name": v["fn"].__name__, "id": hash_id_dict[k]})
for k, task_dict in task_hash_dict.items():
for arg in task_dict["args"]:
if not isinstance(arg, Future):
node_id = len(node_lst)
node_lst.append({"name": str(arg), "id": node_id})
edge_lst.append({"start": node_id, "end": hash_id_dict[k], "label": ""})
else:
edge_lst.append(
{
"start": hash_id_dict[future_hash_inverse_dict[arg]],
"end": hash_id_dict[k],
"label": "",
}
)
for kw, v in task_dict["kwargs"].items():
if not isinstance(v, Future):
node_id = len(node_lst)
node_lst.append({"name": str(v), "id": node_id})
edge_lst.append(
{"start": node_id, "end": hash_id_dict[k], "label": str(kw)}
)
else:
edge_lst.append(
{
"start": hash_id_dict[future_hash_inverse_dict[v]],
"end": hash_id_dict[k],
"label": str(kw),
}
)
return node_lst, edge_lst


def generate_task_hash(task_dict: dict, future_hash_inverse_dict: dict) -> bytes:
args_for_hash = [
arg if not isinstance(arg, Future) else future_hash_inverse_dict[arg]
for arg in task_dict["args"]
]
kwargs_for_hash = {
k: v if not isinstance(v, Future) else future_hash_inverse_dict[v]
for k, v in task_dict["kwargs"].items()
}
return cloudpickle.dumps(
{"fn": task_dict["fn"], "args": args_for_hash, "kwargs": kwargs_for_hash}
)


def draw(node_lst: list, edge_lst: list):
from IPython.display import SVG, display # noqa
import matplotlib.pyplot as plt # noqa
import networkx as nx # noqa

graph = nx.DiGraph()
for node in node_lst:
graph.add_node(node["id"], label=node["name"])
for edge in edge_lst:
graph.add_edge(edge["start"], edge["end"], label=edge["label"])
svg = nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg")
display(SVG(svg))
plt.show()
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ hdf = [
"h5py==3.11.0",
"h5io==0.2.3",
]
graph = [
"pygraphviz==1.13",
"matplotlib==3.8.4",
"networkx==3.3",
"ipython==8.25.0",
]

[tool.setuptools.packages.find]
include = ["pympipool*"]
Expand Down
36 changes: 36 additions & 0 deletions tests/test_dependencies_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@
from pympipool.interactive import create_executor
from pympipool.shared.thread import RaisingThread
from pympipool.shared.executor import execute_tasks_with_dependencies
from pympipool.shared.plot import generate_nodes_and_edges


try:
import pygraphviz

skip_graphviz_test = False
except ImportError:
skip_graphviz_test = True


def add_function(parameter_1, parameter_2):
Expand All @@ -23,6 +32,33 @@ def test_executor(self):
future_2 = exe.submit(add_function, 1, parameter_2=future_1)
self.assertEqual(future_2.result(), 4)

@unittest.skipIf(
skip_graphviz_test,
"graphviz is not installed, so the plot_dependency_graph test is skipped.",
)
def test_executor_dependency_plot(self):
with Executor(
max_cores=1,
backend="local",
hostname_localhost=True,
plot_dependency_graph=True,
) as exe:
cloudpickle_register(ind=1)
future_1 = exe.submit(add_function, 1, parameter_2=2)
future_2 = exe.submit(add_function, 1, parameter_2=future_1)
self.assertTrue(future_1.done())
self.assertTrue(future_2.done())
self.assertEqual(len(exe._future_hash_dict), 2)
self.assertEqual(len(exe._task_hash_dict), 2)
nodes, edges = generate_nodes_and_edges(
task_hash_dict=exe._task_hash_dict,
future_hash_inverse_dict={
v: k for k, v in exe._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(edges), 4)

Comment on lines +35 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a new test method test_executor_dependency_plot that checks the functionality of the dependency graph plotting. The test is well-structured and appropriately skips execution if graphviz is not installed. However, the import of pygraphviz should be adjusted to use importlib.util.find_spec for checking availability instead of catching an ImportError.

- import pygraphviz
+ from importlib import util
+ skip_graphviz_test = util.find_spec("pygraphviz") is None
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@unittest.skipIf(
skip_graphviz_test,
"graphviz is not installed, so the plot_dependency_graph test is skipped.",
)
def test_executor_dependency_plot(self):
with Executor(
max_cores=1,
backend="local",
hostname_localhost=True,
plot_dependency_graph=True,
) as exe:
cloudpickle_register(ind=1)
future_1 = exe.submit(add_function, 1, parameter_2=2)
future_2 = exe.submit(add_function, 1, parameter_2=future_1)
self.assertTrue(future_1.done())
self.assertTrue(future_2.done())
self.assertEqual(len(exe._future_hash_dict), 2)
self.assertEqual(len(exe._task_hash_dict), 2)
nodes, edges = generate_nodes_and_edges(
task_hash_dict=exe._task_hash_dict,
future_hash_inverse_dict={
v: k for k, v in exe._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(edges), 4)
@unittest.skipIf(
skip_graphviz_test,
"graphviz is not installed, so the plot_dependency_graph test is skipped.",
)
def test_executor_dependency_plot(self):
with Executor(
max_cores=1,
backend="local",
hostname_localhost=True,
plot_dependency_graph=True,
) as exe:
cloudpickle_register(ind=1)
future_1 = exe.submit(add_function, 1, parameter_2=2)
future_2 = exe.submit(add_function, 1, parameter_2=future_1)
self.assertTrue(future_1.done())
self.assertTrue(future_2.done())
self.assertEqual(len(exe._future_hash_dict), 2)
self.assertEqual(len(exe._task_hash_dict), 2)
nodes, edges = generate_nodes_and_edges(
task_hash_dict=exe._task_hash_dict,
future_hash_inverse_dict={
v: k for k, v in exe._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(edges), 4)
+ from importlib import util
+ skip_graphviz_test = util.find_spec("pygraphviz") is None

def test_dependency_steps(self):
cloudpickle_register(ind=1)
fs1 = Future()
Expand Down