diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 3e95579f..159031a3 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -13,14 +13,6 @@ __all__ = [] -try: - from executorlib.cache.executor import FileExecutor - - __all__ += [FileExecutor] -except ImportError: - pass - - class Executor: """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or @@ -47,6 +39,7 @@ class Executor: flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -95,6 +88,7 @@ def __init__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = True, init_function: Optional[callable] = None, @@ -115,6 +109,7 @@ def __new__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = True, init_function: Optional[callable] = None, @@ -149,6 +144,7 @@ def __new__( flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -180,7 +176,24 @@ def __new__( resource_dict.update( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) - if not disable_dependencies: + if "pysqa_" in backend and not plot_dependency_graph: + from executorlib.cache.executor import create_file_executor + + return create_file_executor( + max_workers=max_workers, + backend=backend, + max_cores=max_cores, + cache_directory=cache_directory, + resource_dict=resource_dict, + flux_executor=flux_executor, + flux_executor_pmi_mode=flux_executor_pmi_mode, + flux_executor_nesting=flux_executor_nesting, + pysqa_config_directory=pysqa_config_directory, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ) + elif not disable_dependencies: return ExecutorWithDependencies( max_workers=max_workers, backend=backend, diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 0195a14e..271a1f2e 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -7,6 +7,10 @@ execute_in_subprocess, terminate_subprocess, ) +from executorlib.standalone.inputcheck import ( + check_executor, + check_nested_flux_executor, +) from executorlib.standalone.thread import RaisingThread try: @@ -23,7 +27,7 @@ def __init__( resource_dict: Optional[dict] = None, execute_function: callable = execute_with_pysqa, terminate_function: Optional[callable] = None, - config_directory: Optional[str] = None, + pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, ): """ @@ -36,7 +40,7 @@ def __init__( - cwd (str/None): current working directory where the parallel python task is executed execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. terminate_function (callable, optional): The function to terminate the tasks. - config_directory (str, optional): path to the config directory. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. """ super().__init__() @@ -62,8 +66,58 @@ def __init__( "cache_directory": cache_directory_path, "resource_dict": resource_dict, "terminate_function": terminate_function, - "config_directory": config_directory, + "pysqa_config_directory": pysqa_config_directory, "backend": backend, }, ) ) + + +def create_file_executor( + max_workers: int = 1, + backend: str = "pysqa_flux", + max_cores: int = 1, + cache_directory: Optional[str] = None, + resource_dict: Optional[dict] = None, + flux_executor=None, + flux_executor_pmi_mode: Optional[str] = None, + flux_executor_nesting: bool = False, + pysqa_config_directory: Optional[str] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[callable] = None, +): + if cache_directory is None: + cache_directory = "executorlib_cache" + if max_workers != 1: + raise ValueError( + "The number of workers cannot be controlled with the pysqa based backend." + ) + if max_cores != 1: + raise ValueError( + "The number of cores cannot be controlled with the pysqa based backend." + ) + if hostname_localhost is not None: + raise ValueError( + "The option to connect to hosts based on their hostname is not available with the pysqa based backend." + ) + if block_allocation: + raise ValueError( + "The option block_allocation is not available with the pysqa based backend." + ) + if init_function is not None: + raise ValueError( + "The option to specify an init_function is not available with the pysqa based backend." + ) + if flux_executor_pmi_mode is not None: + raise ValueError( + "The option to specify the flux pmi mode is not available with the pysqa based backend." + ) + check_executor(executor=flux_executor) + check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + return FileExecutor( + cache_directory=cache_directory, + resource_dict=resource_dict, + pysqa_config_directory=pysqa_config_directory, + backend=backend.split("pysqa_")[-1], + ) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index dd09542b..22177b32 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -52,7 +52,7 @@ def execute_tasks_h5( execute_function: callable, resource_dict: dict, terminate_function: Optional[callable] = None, - config_directory: Optional[str] = None, + pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, ) -> None: """ @@ -66,7 +66,7 @@ def execute_tasks_h5( - cwd (str/None): current working directory where the parallel python task is executed execute_function (callable): The function to execute the tasks. terminate_function (callable): The function to terminate the tasks. - config_directory (str, optional): path to the config directory. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. Returns: @@ -120,7 +120,7 @@ def execute_tasks_h5( process_dict[k] for k in future_wait_key_lst ], resource_dict=task_resource_dict, - config_directory=config_directory, + config_directory=pysqa_config_directory, backend=backend, ) file_name_dict[task_key] = os.path.join( diff --git a/executorlib/standalone/cache/queue.py b/executorlib/standalone/cache/queue.py index c1b6b176..40570eea 100644 --- a/executorlib/standalone/cache/queue.py +++ b/executorlib/standalone/cache/queue.py @@ -35,5 +35,14 @@ def execute_with_pysqa( "working_directory": resource_dict["cwd"], } del resource_dict["cwd"] + unsupported_keys = [ + "threads_per_core", + "gpus_per_core", + "openmpi_oversubscribe", + "slurm_cmd_args", + ] + for k in unsupported_keys: + if k in resource_dict: + del resource_dict[k] submit_kwargs.update(resource_dict) return qa.submit_job(**submit_kwargs) diff --git a/tests/test_cache_executor_mpi.py b/tests/test_cache_executor_mpi.py index bc5e1226..0b8a657b 100644 --- a/tests/test_cache_executor_mpi.py +++ b/tests/test_cache_executor_mpi.py @@ -7,7 +7,7 @@ try: - from executorlib import FileExecutor + from executorlib.cache.executor import FileExecutor skip_h5py_test = False except ImportError: diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index 545f9f25..bceaafe6 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -3,9 +3,11 @@ import unittest import shutil +from executorlib import Executor +from executorlib.standalone.serialize import cloudpickle_register + try: import flux.job - from executorlib import FileExecutor skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) @@ -30,10 +32,12 @@ def mpi_funct(i): ) class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): - with FileExecutor( - resource_dict={"cores": 2}, - backend="flux", + with Executor( + backend="pysqa_flux", + resource_dict={"cores": 2, "cwd": os.path.abspath("cwd")}, + block_allocation=False, ) as exe: + cloudpickle_register(ind=1) fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 96aa2df0..f0cffe9d 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -11,7 +11,7 @@ from executorlib.standalone.thread import RaisingThread try: - from executorlib import FileExecutor + from executorlib.cache.executor import FileExecutor from executorlib.cache.shared import execute_tasks_h5 skip_h5py_test = False