From cd3929b6d8ca963a91fd3aa9c491db48fa2ffd26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 12 Jul 2025 09:36:03 +0200 Subject: [PATCH 1/9] create_file_executor() - rename backend variable --- executorlib/executor/flux.py | 2 +- executorlib/executor/slurm.py | 2 +- executorlib/task_scheduler/file/task_scheduler.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index b3aecf78..a6066c11 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -348,7 +348,7 @@ def __init__( super().__init__( executor=create_file_executor( max_workers=max_workers, - backend="flux_submission", + backend="flux", max_cores=max_cores, cache_directory=cache_directory, resource_dict=resource_dict, diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index 505fe915..3f429e3b 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -160,7 +160,7 @@ def __init__( super().__init__( executor=create_file_executor( max_workers=max_workers, - backend="slurm_submission", + backend="slurm", max_cores=max_cores, cache_directory=cache_directory, resource_dict=resource_dict, diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 65daffab..f0bbb7e3 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -80,7 +80,7 @@ def __init__( def create_file_executor( resource_dict: dict, max_workers: Optional[int] = None, - backend: str = "flux_submission", + backend: str = "flux", max_cores: Optional[int] = None, cache_directory: Optional[str] = None, flux_executor=None, @@ -112,6 +112,6 @@ def create_file_executor( return FileTaskScheduler( resource_dict=resource_dict, pysqa_config_directory=pysqa_config_directory, - backend=backend.split("_submission")[0], + backend=backend, disable_dependencies=disable_dependencies, ) From 76f1c0927fb9aa215eeba709899d8bfcdc833bab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 12 Jul 2025 09:47:03 +0200 Subject: [PATCH 2/9] change default --- executorlib/task_scheduler/file/task_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index f0bbb7e3..340ddd12 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -80,7 +80,7 @@ def __init__( def create_file_executor( resource_dict: dict, max_workers: Optional[int] = None, - backend: str = "flux", + backend: Optional[str] = None, max_cores: Optional[int] = None, cache_directory: Optional[str] = None, flux_executor=None, From 6660a0628030e0879e1b62cb09522f1da3e76868 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 12 Jul 2025 10:01:56 +0200 Subject: [PATCH 3/9] Add option to set execute_function --- executorlib/task_scheduler/file/task_scheduler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 340ddd12..fa9d7c8b 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -92,6 +92,7 @@ def create_file_executor( block_allocation: bool = False, init_function: Optional[Callable] = None, disable_dependencies: bool = False, + execute_function: Callable = execute_with_pysqa, ): if block_allocation: raise ValueError( @@ -114,4 +115,5 @@ def create_file_executor( pysqa_config_directory=pysqa_config_directory, backend=backend, disable_dependencies=disable_dependencies, + execute_function=execute_function, ) From 30ca9c803f5fd15aa2d862317b5c0cd7ada48d0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 12 Jul 2025 10:03:11 +0200 Subject: [PATCH 4/9] Add TestClusterExecutor to simplify debugging of SlurmClusterExecutor and FluxClusterExecutor --- executorlib/api.py | 2 + executorlib/executor/single.py | 169 +++++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+) diff --git a/executorlib/api.py b/executorlib/api.py index 3b236140..9e9b0941 100644 --- a/executorlib/api.py +++ b/executorlib/api.py @@ -5,6 +5,7 @@ functionality is considered internal and might change during minor releases. """ +from executorlib.executor.single import TestClusterExecutor from executorlib.standalone.command import get_command_path from executorlib.standalone.interactive.communication import ( SocketInterface, @@ -19,6 +20,7 @@ from executorlib.standalone.serialize import cloudpickle_register __all__: list[str] = [ + "TestClusterExecutor", "cancel_items_in_queue", "cloudpickle_register", "get_command_path", diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 5293cad2..614401dc 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -184,6 +184,175 @@ def __init__( ) +class TestClusterExecutor(BaseExecutor): + """ + The executorlib.TestClusterExecutor is designed to test the file based communication used in the SlurmClusterExecutor + and the FluxClusterExecutor locally. It is not recommended for production use, rather use the SingleNodeExecutor. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource + requirements, executorlib supports block allocation. In this case all resources have + to be defined on the executor, rather than during the submission of the individual + function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + + Examples: + ``` + >>> import numpy as np + >>> from executorlib.api import TestClusterExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with TestClusterExecutor(max_workers=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + log_obj_size: bool = False, + ): + """ + The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload + manager or preferable the flux framework for distributing python functions within a given resource allocation. + In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.SlurmClusterExecutor can be executed in a + serial python process and does not require the python script to be executed with MPI. It is even possible to + execute the executorlib.SlurmClusterExecutor directly in an interactive Jupyter notebook. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + + """ + default_resource_dict: dict = { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + } + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) + if not plot_dependency_graph: + from executorlib.task_scheduler.file.task_scheduler import ( + create_file_executor, + ) + from executorlib.task_scheduler.file.subprocess_spawner import ( + execute_in_subprocess, + ) + + super().__init__( + executor=create_file_executor( + max_workers=max_workers, + backend=None, + max_cores=max_cores, + cache_directory=cache_directory, + resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=None, + flux_executor_nesting=False, + flux_log_files=False, + pysqa_config_directory=None, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + disable_dependencies=disable_dependencies, + execute_function=execute_in_subprocess, + ) + ) + else: + super().__init__( + executor=DependencyTaskScheduler( + executor=create_single_node_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + log_obj_size=log_obj_size, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + ) + + def create_single_node_executor( max_workers: Optional[int] = None, max_cores: Optional[int] = None, From d7dd121c650ddde829fd268b295b1afc5a7769b3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 12 Jul 2025 08:03:47 +0000 Subject: [PATCH 5/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/executor/single.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 614401dc..97f5edd8 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -306,12 +306,12 @@ def __init__( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) if not plot_dependency_graph: - from executorlib.task_scheduler.file.task_scheduler import ( - create_file_executor, - ) from executorlib.task_scheduler.file.subprocess_spawner import ( execute_in_subprocess, ) + from executorlib.task_scheduler.file.task_scheduler import ( + create_file_executor, + ) super().__init__( executor=create_file_executor( From 6f0a6c8e1103d5438b8a5d1fdfb9cb61ebc686a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 12 Jul 2025 11:01:40 +0200 Subject: [PATCH 6/9] Add tests --- tests/test_testclusterexecutor.py | 53 +++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 tests/test_testclusterexecutor.py diff --git a/tests/test_testclusterexecutor.py b/tests/test_testclusterexecutor.py new file mode 100644 index 00000000..12b38f5f --- /dev/null +++ b/tests/test_testclusterexecutor.py @@ -0,0 +1,53 @@ +import unittest + +from executorlib import get_cache_data +from executorlib.api import TestClusterExecutor +from executorlib.standalone.serialize import cloudpickle_register + +try: + import h5py + + skip_h5py_test = False +except ImportError: + skip_h5py_test = True + + +def foo(x): + return x + 1 + + +@unittest.skipIf( + skip_h5py_test, "h5py is not installed, so the h5io tests are skipped." +) +class TestTestClusterExecutor(unittest.TestCase): + def test_cache_dir(self): + with TestClusterExecutor(cache_directory="not_this_dir", resource_dict={}) as exe: + cloudpickle_register(ind=1) + future = exe.submit( + foo, + 1, + resource_dict={ + "cache_directory": "rather_this_dir", + "cache_key": "foo", + }, + ) + self.assertEqual(future.result(), 2) + cache_lst = get_cache_data(cache_directory="not_this_dir") + self.assertEqual(len(cache_lst), 0) + cache_lst = get_cache_data(cache_directory="rather_this_dir") + self.assertEqual(len(cache_lst), 1) + with TestClusterExecutor(cache_directory="not_this_dir", resource_dict={}) as exe: + cloudpickle_register(ind=1) + future = exe.submit( + foo, + 1, + resource_dict={ + "cache_directory": "rather_this_dir", + "cache_key": "foo", + }, + ) + self.assertEqual(future.result(), 2) + cache_lst = get_cache_data(cache_directory="not_this_dir") + self.assertEqual(len(cache_lst), 0) + cache_lst = get_cache_data(cache_directory="rather_this_dir") + self.assertEqual(len(cache_lst), 1) From b15de234b3d056412e55b76a8400a8ae7b2360ec Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Sat, 12 Jul 2025 11:07:24 +0200 Subject: [PATCH 7/9] Update executorlib/executor/single.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- executorlib/executor/single.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 97f5edd8..af16d45c 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -256,11 +256,8 @@ def __init__( log_obj_size: bool = False, ): """ - The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload - manager or preferable the flux framework for distributing python functions within a given resource allocation. - In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.SlurmClusterExecutor can be executed in a - serial python process and does not require the python script to be executed with MPI. It is even possible to - execute the executorlib.SlurmClusterExecutor directly in an interactive Jupyter notebook. + The executorlib.TestClusterExecutor is designed to test the file based communication used in the SlurmClusterExecutor + and the FluxClusterExecutor locally. It is not recommended for production use, rather use the SingleNodeExecutor. Args: max_workers (int): for backwards compatibility with the standard library, max_workers also defines the From 545f738d96042fd629a0f0a195bb1b9c2e857984 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 12 Jul 2025 11:08:14 +0200 Subject: [PATCH 8/9] update docstring --- executorlib/executor/single.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index af16d45c..4e84b52b 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -186,8 +186,9 @@ def __init__( class TestClusterExecutor(BaseExecutor): """ - The executorlib.TestClusterExecutor is designed to test the file based communication used in the SlurmClusterExecutor - and the FluxClusterExecutor locally. It is not recommended for production use, rather use the SingleNodeExecutor. + The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the + SlurmClusterExecutor and the FluxClusterExecutor locally. It is not recommended for production use, rather use the + SingleNodeExecutor. Args: max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of @@ -256,8 +257,9 @@ def __init__( log_obj_size: bool = False, ): """ - The executorlib.TestClusterExecutor is designed to test the file based communication used in the SlurmClusterExecutor - and the FluxClusterExecutor locally. It is not recommended for production use, rather use the SingleNodeExecutor. + The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the + SlurmClusterExecutor and the FluxClusterExecutor locally. It is not recommended for production use, rather use + the SingleNodeExecutor. Args: max_workers (int): for backwards compatibility with the standard library, max_workers also defines the From 38e96e8fd66d7d355b9493339e69c0d43bf9f246 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Sat, 12 Jul 2025 11:19:14 +0200 Subject: [PATCH 9/9] more tests --- tests/test_testclusterexecutor.py | 43 +++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/test_testclusterexecutor.py b/tests/test_testclusterexecutor.py index 12b38f5f..0a47aa5f 100644 --- a/tests/test_testclusterexecutor.py +++ b/tests/test_testclusterexecutor.py @@ -1,7 +1,10 @@ +import os +import shutil import unittest from executorlib import get_cache_data from executorlib.api import TestClusterExecutor +from executorlib.standalone.plot import generate_nodes_and_edges from executorlib.standalone.serialize import cloudpickle_register try: @@ -12,6 +15,10 @@ skip_h5py_test = True +def add_function(parameter_1, parameter_2): + return parameter_1 + parameter_2 + + def foo(x): return x + 1 @@ -32,8 +39,10 @@ def test_cache_dir(self): }, ) self.assertEqual(future.result(), 2) + self.assertFalse(os.path.exists("not_this_dir")) cache_lst = get_cache_data(cache_directory="not_this_dir") self.assertEqual(len(cache_lst), 0) + self.assertTrue(os.path.exists("rather_this_dir")) cache_lst = get_cache_data(cache_directory="rather_this_dir") self.assertEqual(len(cache_lst), 1) with TestClusterExecutor(cache_directory="not_this_dir", resource_dict={}) as exe: @@ -47,7 +56,41 @@ def test_cache_dir(self): }, ) self.assertEqual(future.result(), 2) + self.assertFalse(os.path.exists("not_this_dir")) cache_lst = get_cache_data(cache_directory="not_this_dir") self.assertEqual(len(cache_lst), 0) + self.assertTrue(os.path.exists("rather_this_dir")) + cache_lst = get_cache_data(cache_directory="rather_this_dir") + self.assertEqual(len(cache_lst), 1) + + def test_empty(self): + with TestClusterExecutor(cache_directory="rather_this_dir") as exe: + cloudpickle_register(ind=1) + future = exe.submit(foo,1) + self.assertEqual(future.result(), 2) + self.assertTrue(os.path.exists("rather_this_dir")) cache_lst = get_cache_data(cache_directory="rather_this_dir") self.assertEqual(len(cache_lst), 1) + + def test_executor_dependency_plot(self): + with TestClusterExecutor( + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_function, 1, parameter_2=2) + future_2 = exe.submit(add_function, 1, parameter_2=future_1) + self.assertTrue(future_1.done()) + self.assertTrue(future_2.done()) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_scheduler._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._task_scheduler._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 5) + self.assertEqual(len(edges), 4) + + def tearDown(self): + shutil.rmtree("rather_this_dir", ignore_errors=True) \ No newline at end of file