diff --git a/executorlib/__init__.py b/executorlib/__init__.py index baf4aea7..961572c9 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -41,6 +41,7 @@ class Executor: cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores. backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". max_cores (int): defines the number cores which can be used in parallel cores_per_worker (int): number of MPI cores to be used for each function call threads_per_core (int): number of OpenMP threads to be used for each function call @@ -93,6 +94,7 @@ def __init__( self, max_workers: int = 1, backend: str = "local", + cache_directory: Optional[str] = None, max_cores: int = 1, cores_per_worker: int = 1, threads_per_core: int = 1, @@ -117,6 +119,7 @@ def __new__( cls, max_workers: int = 1, backend: str = "local", + cache_directory: Optional[str] = None, max_cores: int = 1, cores_per_worker: int = 1, threads_per_core: int = 1, @@ -147,6 +150,7 @@ def __new__( number of cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores. backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". max_cores (int): defines the number cores which can be used in parallel cores_per_worker (int): number of MPI cores to be used for each function call threads_per_core (int): number of OpenMP threads to be used for each function call @@ -179,6 +183,7 @@ def __new__( return ExecutorWithDependencies( max_workers=max_workers, backend=backend, + cache_directory=cache_directory, max_cores=max_cores, cores_per_worker=cores_per_worker, threads_per_core=threads_per_core, @@ -201,6 +206,7 @@ def __new__( return create_executor( max_workers=max_workers, backend=backend, + cache_directory=cache_directory, max_cores=max_cores, cores_per_worker=cores_per_worker, threads_per_core=threads_per_core, diff --git a/executorlib/interactive/__init__.py b/executorlib/interactive/__init__.py index ff70c1c1..e17d176c 100644 --- a/executorlib/interactive/__init__.py +++ b/executorlib/interactive/__init__.py @@ -31,6 +31,7 @@ def create_executor( max_workers: int = 1, backend: str = "local", max_cores: int = 1, + cache_directory: Optional[str] = None, cores_per_worker: int = 1, threads_per_core: int = 1, gpus_per_worker: int = 0, @@ -56,8 +57,9 @@ def create_executor( max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of cores which can be used in parallel - just like the max_cores parameter. Using max_cores is recommended, as computers have a limited number of compute cores. - max_cores (int): defines the number cores which can be used in parallel backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". + max_cores (int): defines the number cores which can be used in parallel + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". cores_per_worker (int): number of MPI cores to be used for each function call threads_per_core (int): number of OpenMP threads to be used for each function call gpus_per_worker (int): number of GPUs per worker - defaults to 0 @@ -88,6 +90,7 @@ def create_executor( "cores": cores_per_worker, "hostname_localhost": hostname_localhost, "cwd": cwd, + "cache_directory": cache_directory, } if backend == "flux": check_oversubscribe(oversubscribe=openmpi_oversubscribe) diff --git a/executorlib/shared/executor.py b/executorlib/shared/executor.py index 24c0c613..800e92ae 100644 --- a/executorlib/shared/executor.py +++ b/executorlib/shared/executor.py @@ -1,5 +1,6 @@ import importlib.util import inspect +import os import queue import sys from concurrent.futures import ( @@ -14,11 +15,12 @@ import cloudpickle from executorlib.shared.command import get_command_path -from executorlib.shared.communication import interface_bootup +from executorlib.shared.communication import SocketInterface, interface_bootup from executorlib.shared.inputcheck import ( check_resource_dict, check_resource_dict_is_empty, ) +from executorlib.shared.serialize import serialize_funct_h5 from executorlib.shared.spawner import BaseSpawner, MpiExecSpawner from executorlib.shared.thread import RaisingThread @@ -304,6 +306,7 @@ def execute_parallel_tasks( spawner: BaseSpawner = MpiExecSpawner, hostname_localhost: Optional[bool] = None, init_function: Optional[Callable] = None, + cache_directory: Optional[str] = None, **kwargs, ) -> None: """ @@ -321,6 +324,7 @@ def execute_parallel_tasks( this look up for security reasons. So on MacOS it is required to set this option to true init_function (callable): optional function to preset arguments for functions which are submitted later + cache_directory (str, optional): The directory to store cache files. Defaults to "cache". """ interface = interface_bootup( command_lst=_get_backend_path( @@ -341,17 +345,17 @@ def execute_parallel_tasks( future_queue.join() break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - f = task_dict.pop("future") - if f.set_running_or_notify_cancel(): - try: - f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) - except Exception as thread_exception: - interface.shutdown(wait=True) - future_queue.task_done() - f.set_exception(exception=thread_exception) - raise thread_exception - else: - future_queue.task_done() + if cache_directory is None: + _execute_task( + interface=interface, task_dict=task_dict, future_queue=future_queue + ) + else: + _execute_task_with_cache( + interface=interface, + task_dict=task_dict, + future_queue=future_queue, + cache_directory=cache_directory, + ) def execute_separate_tasks( @@ -644,3 +648,68 @@ def _submit_function_to_separate_process( ) process.start() return process, active_task_dict + + +def _execute_task( + interface: SocketInterface, task_dict: dict, future_queue: queue.Queue +): + """ + Execute the task in the task_dict by communicating it via the interface. + + Args: + interface (SocketInterface): socket interface for zmq communication + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} + future_queue (Queue): Queue for receiving new tasks. + """ + f = task_dict.pop("future") + if f.set_running_or_notify_cancel(): + try: + f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) + except Exception as thread_exception: + interface.shutdown(wait=True) + future_queue.task_done() + f.set_exception(exception=thread_exception) + raise thread_exception + else: + future_queue.task_done() + + +def _execute_task_with_cache( + interface: SocketInterface, + task_dict: dict, + future_queue: queue.Queue, + cache_directory: str, +): + """ + Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory. + + Args: + interface (SocketInterface): socket interface for zmq communication + task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys + {"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}} + future_queue (Queue): Queue for receiving new tasks. + cache_directory (str): The directory to store cache files. + """ + from executorlib.shared.hdf import dump, get_output + + task_key, data_dict = serialize_funct_h5( + task_dict["fn"], *task_dict["args"], **task_dict["kwargs"] + ) + file_name = os.path.join(cache_directory, task_key + ".h5out") + if not os.path.exists(cache_directory): + os.mkdir(cache_directory) + future = task_dict["future"] + if task_key + ".h5out" not in os.listdir(cache_directory): + _execute_task( + interface=interface, + task_dict=task_dict, + future_queue=future_queue, + ) + data_dict["output"] = future.result() + dump(file_name=file_name, data_dict=data_dict) + else: + _, result = get_output(file_name=file_name) + future = task_dict["future"] + future.set_result(result) + future_queue.task_done() diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index 85731b82..94808c3e 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -1,4 +1,6 @@ import importlib.util +import shutil +import time import unittest from executorlib import Executor @@ -20,6 +22,15 @@ def mpi_funct(i): return i, size, rank +def mpi_funct_sleep(i): + from mpi4py import MPI + + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + time.sleep(i) + return i, size, rank + + class TestExecutorBackend(unittest.TestCase): def test_meta_executor_serial(self): with Executor(max_cores=2, backend="local", block_allocation=True) as exe: @@ -71,3 +82,33 @@ def test_errors(self): gpus_per_worker=1, backend="local", ) + + +class TestExecutorBackendCache(unittest.TestCase): + def tearDown(self): + shutil.rmtree("./cache") + + @unittest.skipIf( + skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." + ) + def test_meta_executor_parallel_cache(self): + with Executor( + max_workers=2, + cores_per_worker=2, + backend="local", + block_allocation=True, + cache_directory="./cache", + ) as exe: + cloudpickle_register(ind=1) + time_1 = time.time() + fs_1 = exe.submit(mpi_funct_sleep, 1) + self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs_1.done()) + time_2 = time.time() + self.assertTrue(time_2 - time_1 > 1) + time_3 = time.time() + fs_2 = exe.submit(mpi_funct_sleep, 1) + self.assertEqual(fs_2.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs_2.done()) + time_4 = time.time() + self.assertTrue(time_3 - time_4 < 1)