From b38ddb702c924ad27fd5b3c9afba6e6781d6c8cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Mon, 17 Jun 2024 08:38:40 +0200 Subject: [PATCH 1/5] Implement many-to-one dependencies Example: ``` from pympipool import Executor def generate_tasks(length): return range(length) def calc_function(lst, ind, parameter): return lst[ind] + parameter def merge(lst): return sum(lst) length = 5 parameter = 1 with Executor(max_cores=2, backend="local", plot_dependency_graph=False) as exe: future_lst = exe.submit( generate_tasks, length=length, resource_dict={"cores": 1}, ) lst = [] for i in range(length): lst.append(exe.submit( calc_function, lst=future_lst, ind=i, parameter=parameter, resource_dict={"cores": 1}, )) future_sum = exe.submit( merge, lst=lst, resource_dict={"cores": 1}, ) print(future_sum.result()) ``` --- pympipool/shared/executor.py | 26 +++++++-- pympipool/shared/plot.py | 65 ++++++++++++---------- tests/test_dependencies_executor.py | 85 +++++++++++++++++++++++++++++ 3 files changed, 142 insertions(+), 34 deletions(-) diff --git a/pympipool/shared/executor.py b/pympipool/shared/executor.py index 4d1865f1..b3dd52e4 100644 --- a/pympipool/shared/executor.py +++ b/pympipool/shared/executor.py @@ -520,9 +520,17 @@ def _update_futures_in_input(args: tuple, kwargs: dict): Returns: tuple, dict: arguments and keyword arguments with each future object in them being evaluated """ - args = [arg if not isinstance(arg, Future) else arg.result() for arg in args] + def get_result(arg): + if isinstance(arg, Future): + return arg.result() + elif isinstance(arg, list): + return [get_result(arg=l) for l in arg] + else: + return arg + + args = [get_result(arg=arg) for arg in args] kwargs = { - key: value if not isinstance(value, Future) else value.result() + key: get_result(arg=value) for key, value in kwargs.items() } return args, kwargs @@ -539,9 +547,17 @@ def _get_future_objects_from_input(task_dict: dict): Returns: list, boolean: list of future objects and boolean flag if all future objects are already done """ - future_lst = [arg for arg in task_dict["args"] if isinstance(arg, Future)] + [ - value for value in task_dict["kwargs"].values() if isinstance(value, Future) - ] + future_lst = [] + + def find_future_in_list(lst): + for l in lst: + if isinstance(l, Future): + future_lst.append(l) + elif isinstance(l, list): + find_future_in_list(lst=l) + + find_future_in_list(lst=task_dict["args"]) + find_future_in_list(lst=task_dict["kwargs"].values()) boolean_flag = len([future for future in future_lst if future.done()]) == len( future_lst ) diff --git a/pympipool/shared/plot.py b/pympipool/shared/plot.py index 8c9ccaa6..a0f9db60 100644 --- a/pympipool/shared/plot.py +++ b/pympipool/shared/plot.py @@ -6,51 +6,58 @@ def generate_nodes_and_edges( task_hash_dict: dict, future_hash_inverse_dict: dict -) -> Tuple[list]: +) -> Tuple[list, list]: node_lst, edge_lst = [], [] hash_id_dict = {} + + def add_element(arg, link_to, label=""): + if isinstance(arg, Future): + edge_lst.append( + { + "start": hash_id_dict[future_hash_inverse_dict[arg]], + "end": link_to, + "label": label, + } + ) + elif isinstance(arg, list) and all([isinstance(a, Future) for a in arg]): + for a in arg: + add_element(arg=a, link_to=link_to, label=label) + else: + node_id = len(node_lst) + node_lst.append({"name": str(arg), "id": node_id}) + edge_lst.append({"start": node_id, "end": link_to, "label": label}) + for k, v in task_hash_dict.items(): hash_id_dict[k] = len(node_lst) node_lst.append({"name": v["fn"].__name__, "id": hash_id_dict[k]}) for k, task_dict in task_hash_dict.items(): for arg in task_dict["args"]: - if not isinstance(arg, Future): - node_id = len(node_lst) - node_lst.append({"name": str(arg), "id": node_id}) - edge_lst.append({"start": node_id, "end": hash_id_dict[k], "label": ""}) - else: - edge_lst.append( - { - "start": hash_id_dict[future_hash_inverse_dict[arg]], - "end": hash_id_dict[k], - "label": "", - } - ) + add_element(arg=arg, link_to=hash_id_dict[k], label="") + for kw, v in task_dict["kwargs"].items(): - if not isinstance(v, Future): - node_id = len(node_lst) - node_lst.append({"name": str(v), "id": node_id}) - edge_lst.append( - {"start": node_id, "end": hash_id_dict[k], "label": str(kw)} - ) - else: - edge_lst.append( - { - "start": hash_id_dict[future_hash_inverse_dict[v]], - "end": hash_id_dict[k], - "label": str(kw), - } - ) + add_element(arg=v, link_to=hash_id_dict[k], label=str(kw)) + return node_lst, edge_lst def generate_task_hash(task_dict: dict, future_hash_inverse_dict: dict) -> bytes: + def convert_arg(arg, future_hash_inverse_dict): + if isinstance(arg, Future): + return future_hash_inverse_dict[arg] + elif isinstance(arg, list): + return [ + convert_arg(arg=a, future_hash_inverse_dict=future_hash_inverse_dict) + for a in arg + ] + else: + return arg + args_for_hash = [ - arg if not isinstance(arg, Future) else future_hash_inverse_dict[arg] + convert_arg(arg=arg, future_hash_inverse_dict=future_hash_inverse_dict) for arg in task_dict["args"] ] kwargs_for_hash = { - k: v if not isinstance(v, Future) else future_hash_inverse_dict[v] + k: convert_arg(arg=v, future_hash_inverse_dict=future_hash_inverse_dict) for k, v in task_dict["kwargs"].items() } return cloudpickle.dumps( diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 833a1425..f1f7a6a0 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -24,6 +24,21 @@ def add_function(parameter_1, parameter_2): return parameter_1 + parameter_2 +def generate_tasks(length): + sleep(0.2) + return range(length) + + +def calc_from_lst(lst, ind, parameter): + sleep(0.2) + return lst[ind] + parameter + + +def merge(lst): + sleep(0.2) + return sum(lst) + + class TestExecutorWithDependencies(unittest.TestCase): def test_executor(self): with Executor(max_cores=1, backend="local", hostname_localhost=True) as exe: @@ -108,3 +123,73 @@ def test_dependency_steps(self): self.assertTrue(fs1.done()) self.assertTrue(fs2.done()) q.put({"shutdown": True, "wait": True}) + + def test_many_to_one(self): + length = 5 + parameter = 1 + with Executor(max_cores=2, backend="local", hostname_localhost=True) as exe: + cloudpickle_register(ind=1) + future_lst = exe.submit( + generate_tasks, + length=length, + resource_dict={"cores": 1}, + ) + lst = [] + for i in range(length): + lst.append(exe.submit( + calc_from_lst, + lst=future_lst, + ind=i, + parameter=parameter, + resource_dict={"cores": 1}, + )) + future_sum = exe.submit( + merge, + lst=lst, + resource_dict={"cores": 1}, + ) + self.assertEqual(future_sum.result(), 15) + + def test_many_to_one_plot(self): + length = 5 + parameter = 1 + with Executor( + max_cores=2, + backend="local", + hostname_localhost=True, + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_lst = exe.submit( + generate_tasks, + length=length, + resource_dict={"cores": 1}, + ) + lst = [] + for i in range(length): + lst.append(exe.submit( + calc_from_lst, + lst=future_lst, + ind=i, + parameter=parameter, + resource_dict={"cores": 1}, + )) + future_sum = exe.submit( + merge, + lst=lst, + resource_dict={"cores": 1}, + ) + self.assertTrue(future_lst.done()) + for l in lst: + self.assertTrue(l.done()) + self.assertTrue(future_sum.done()) + self.assertEqual(len(exe._future_hash_dict), 7) + self.assertEqual(len(exe._task_hash_dict), 7) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 18) + self.assertEqual(len(edges), 21) From 6e49c9a7139bac1218265dcef52dbccf93839774 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 17 Jun 2024 06:40:11 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pympipool/shared/executor.py | 6 ++---- tests/test_dependencies_executor.py | 32 ++++++++++++++++------------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/pympipool/shared/executor.py b/pympipool/shared/executor.py index b3dd52e4..113ecfea 100644 --- a/pympipool/shared/executor.py +++ b/pympipool/shared/executor.py @@ -520,6 +520,7 @@ def _update_futures_in_input(args: tuple, kwargs: dict): Returns: tuple, dict: arguments and keyword arguments with each future object in them being evaluated """ + def get_result(arg): if isinstance(arg, Future): return arg.result() @@ -529,10 +530,7 @@ def get_result(arg): return arg args = [get_result(arg=arg) for arg in args] - kwargs = { - key: get_result(arg=value) - for key, value in kwargs.items() - } + kwargs = {key: get_result(arg=value) for key, value in kwargs.items()} return args, kwargs diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index f1f7a6a0..6816eb21 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -136,13 +136,15 @@ def test_many_to_one(self): ) lst = [] for i in range(length): - lst.append(exe.submit( - calc_from_lst, - lst=future_lst, - ind=i, - parameter=parameter, - resource_dict={"cores": 1}, - )) + lst.append( + exe.submit( + calc_from_lst, + lst=future_lst, + ind=i, + parameter=parameter, + resource_dict={"cores": 1}, + ) + ) future_sum = exe.submit( merge, lst=lst, @@ -167,13 +169,15 @@ def test_many_to_one_plot(self): ) lst = [] for i in range(length): - lst.append(exe.submit( - calc_from_lst, - lst=future_lst, - ind=i, - parameter=parameter, - resource_dict={"cores": 1}, - )) + lst.append( + exe.submit( + calc_from_lst, + lst=future_lst, + ind=i, + parameter=parameter, + resource_dict={"cores": 1}, + ) + ) future_sum = exe.submit( merge, lst=lst, From 84d7fd653139d982eca2c1e3f4915acaf5e4eba2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Mon, 17 Jun 2024 08:41:33 +0200 Subject: [PATCH 3/5] ruff fixes --- pympipool/shared/executor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pympipool/shared/executor.py b/pympipool/shared/executor.py index b3dd52e4..dbac8350 100644 --- a/pympipool/shared/executor.py +++ b/pympipool/shared/executor.py @@ -524,7 +524,7 @@ def get_result(arg): if isinstance(arg, Future): return arg.result() elif isinstance(arg, list): - return [get_result(arg=l) for l in arg] + return [get_result(arg=el) for el in arg] else: return arg @@ -550,11 +550,11 @@ def _get_future_objects_from_input(task_dict: dict): future_lst = [] def find_future_in_list(lst): - for l in lst: - if isinstance(l, Future): - future_lst.append(l) - elif isinstance(l, list): - find_future_in_list(lst=l) + for el in lst: + if isinstance(el, Future): + future_lst.append(el) + elif isinstance(el, list): + find_future_in_list(lst=el) find_future_in_list(lst=task_dict["args"]) find_future_in_list(lst=task_dict["kwargs"].values()) From 12ec7ca8475ed8a3a7131ab281f750bb849c52c1 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 17 Jun 2024 08:56:33 +0200 Subject: [PATCH 4/5] Update test_dependencies_executor.py --- tests/test_dependencies_executor.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 6816eb21..9ec50088 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -49,7 +49,7 @@ def test_executor(self): @unittest.skipIf( skip_graphviz_test, - "graphviz is not installed, so the plot_dependency_graph test is skipped.", + "graphviz is not installed, so the plot_dependency_graph tests are skipped.", ) def test_executor_dependency_plot(self): with Executor( @@ -152,6 +152,10 @@ def test_many_to_one(self): ) self.assertEqual(future_sum.result(), 15) + @unittest.skipIf( + skip_graphviz_test, + "graphviz is not installed, so the plot_dependency_graph tests are skipped.", + ) def test_many_to_one_plot(self): length = 5 parameter = 1 From 9f039481757a707626516cad1f91134f5585bfcf Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 17 Jun 2024 13:20:33 +0200 Subject: [PATCH 5/5] update shapes --- pympipool/shared/plot.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pympipool/shared/plot.py b/pympipool/shared/plot.py index a0f9db60..3ed3fe6a 100644 --- a/pympipool/shared/plot.py +++ b/pympipool/shared/plot.py @@ -24,12 +24,14 @@ def add_element(arg, link_to, label=""): add_element(arg=a, link_to=link_to, label=label) else: node_id = len(node_lst) - node_lst.append({"name": str(arg), "id": node_id}) + node_lst.append({"name": str(arg), "id": node_id, "shape": "circle"}) edge_lst.append({"start": node_id, "end": link_to, "label": label}) for k, v in task_hash_dict.items(): hash_id_dict[k] = len(node_lst) - node_lst.append({"name": v["fn"].__name__, "id": hash_id_dict[k]}) + node_lst.append( + {"name": v["fn"].__name__, "id": hash_id_dict[k], "shape": "box"} + ) for k, task_dict in task_hash_dict.items(): for arg in task_dict["args"]: add_element(arg=arg, link_to=hash_id_dict[k], label="") @@ -72,7 +74,7 @@ def draw(node_lst: list, edge_lst: list): graph = nx.DiGraph() for node in node_lst: - graph.add_node(node["id"], label=node["name"]) + graph.add_node(node["id"], label=node["name"], shape=node["shape"]) for edge in edge_lst: graph.add_edge(edge["start"], edge["end"], label=edge["label"]) svg = nx.nx_agraph.to_agraph(graph).draw(prog="dot", format="svg")