Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Executor:
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
Expand Down Expand Up @@ -93,6 +94,7 @@ def __init__(
self,
max_workers: int = 1,
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
Expand All @@ -117,6 +119,7 @@ def __new__(
cls,
max_workers: int = 1,
backend: str = "local",
cache_directory: Optional[str] = None,
max_cores: int = 1,
cores_per_worker: int = 1,
threads_per_core: int = 1,
Expand Down Expand Up @@ -147,6 +150,7 @@ def __new__(
number of cores which can be used in parallel - just like the max_cores parameter. Using
max_cores is recommended, as computers have a limited number of compute cores.
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
max_cores (int): defines the number cores which can be used in parallel
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
Expand Down Expand Up @@ -179,6 +183,7 @@ def __new__(
return ExecutorWithDependencies(
max_workers=max_workers,
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
Expand All @@ -201,6 +206,7 @@ def __new__(
return create_executor(
max_workers=max_workers,
backend=backend,
cache_directory=cache_directory,
max_cores=max_cores,
cores_per_worker=cores_per_worker,
threads_per_core=threads_per_core,
Expand Down
5 changes: 4 additions & 1 deletion executorlib/interactive/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def create_executor(
max_workers: int = 1,
backend: str = "local",
max_cores: int = 1,
cache_directory: Optional[str] = None,
cores_per_worker: int = 1,
threads_per_core: int = 1,
gpus_per_worker: int = 0,
Expand All @@ -56,8 +57,9 @@ def create_executor(
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.
max_cores (int): defines the number cores which can be used in parallel
backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local".
max_cores (int): defines the number cores which can be used in parallel
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
cores_per_worker (int): number of MPI cores to be used for each function call
threads_per_core (int): number of OpenMP threads to be used for each function call
gpus_per_worker (int): number of GPUs per worker - defaults to 0
Expand Down Expand Up @@ -88,6 +90,7 @@ def create_executor(
"cores": cores_per_worker,
"hostname_localhost": hostname_localhost,
"cwd": cwd,
"cache_directory": cache_directory,
}
if backend == "flux":
check_oversubscribe(oversubscribe=openmpi_oversubscribe)
Expand Down
93 changes: 81 additions & 12 deletions executorlib/shared/executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import importlib.util
import inspect
import os
import queue
import sys
from concurrent.futures import (
Expand All @@ -14,11 +15,12 @@
import cloudpickle

from executorlib.shared.command import get_command_path
from executorlib.shared.communication import interface_bootup
from executorlib.shared.communication import SocketInterface, interface_bootup
from executorlib.shared.inputcheck import (
check_resource_dict,
check_resource_dict_is_empty,
)
from executorlib.shared.serialize import serialize_funct_h5
from executorlib.shared.spawner import BaseSpawner, MpiExecSpawner
from executorlib.shared.thread import RaisingThread

Expand Down Expand Up @@ -304,6 +306,7 @@ def execute_parallel_tasks(
spawner: BaseSpawner = MpiExecSpawner,
hostname_localhost: Optional[bool] = None,
init_function: Optional[Callable] = None,
cache_directory: Optional[str] = None,
**kwargs,
) -> None:
"""
Expand All @@ -321,6 +324,7 @@ def execute_parallel_tasks(
this look up for security reasons. So on MacOS it is required to set this
option to true
init_function (callable): optional function to preset arguments for functions which are submitted later
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
"""
interface = interface_bootup(
command_lst=_get_backend_path(
Expand All @@ -341,17 +345,17 @@ def execute_parallel_tasks(
future_queue.join()
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
f = task_dict.pop("future")
if f.set_running_or_notify_cancel():
try:
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exception:
interface.shutdown(wait=True)
future_queue.task_done()
f.set_exception(exception=thread_exception)
raise thread_exception
else:
future_queue.task_done()
if cache_directory is None:
_execute_task(
interface=interface, task_dict=task_dict, future_queue=future_queue
)
else:
_execute_task_with_cache(
interface=interface,
task_dict=task_dict,
future_queue=future_queue,
cache_directory=cache_directory,
)


def execute_separate_tasks(
Expand Down Expand Up @@ -644,3 +648,68 @@ def _submit_function_to_separate_process(
)
process.start()
return process, active_task_dict


def _execute_task(
interface: SocketInterface, task_dict: dict, future_queue: queue.Queue
):
"""
Execute the task in the task_dict by communicating it via the interface.

Args:
interface (SocketInterface): socket interface for zmq communication
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
future_queue (Queue): Queue for receiving new tasks.
"""
f = task_dict.pop("future")
if f.set_running_or_notify_cancel():
try:
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exception:
interface.shutdown(wait=True)
future_queue.task_done()
f.set_exception(exception=thread_exception)
raise thread_exception
else:
future_queue.task_done()


def _execute_task_with_cache(
interface: SocketInterface,
task_dict: dict,
future_queue: queue.Queue,
cache_directory: str,
):
"""
Execute the task in the task_dict by communicating it via the interface using the cache in the cache directory.

Args:
interface (SocketInterface): socket interface for zmq communication
task_dict (dict): task submitted to the executor as dictionary. This dictionary has the following keys
{"fn": callable, "args": (), "kwargs": {}, "resource_dict": {}}
future_queue (Queue): Queue for receiving new tasks.
cache_directory (str): The directory to store cache files.
"""
from executorlib.shared.hdf import dump, get_output

task_key, data_dict = serialize_funct_h5(
task_dict["fn"], *task_dict["args"], **task_dict["kwargs"]
)
file_name = os.path.join(cache_directory, task_key + ".h5out")
if not os.path.exists(cache_directory):
os.mkdir(cache_directory)
future = task_dict["future"]
if task_key + ".h5out" not in os.listdir(cache_directory):
_execute_task(
interface=interface,
task_dict=task_dict,
future_queue=future_queue,
)
data_dict["output"] = future.result()
dump(file_name=file_name, data_dict=data_dict)
else:
_, result = get_output(file_name=file_name)
future = task_dict["future"]
future.set_result(result)
future_queue.task_done()
41 changes: 41 additions & 0 deletions tests/test_executor_backend_mpi.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import importlib.util
import shutil
import time
import unittest

from executorlib import Executor
Expand All @@ -20,6 +22,15 @@ def mpi_funct(i):
return i, size, rank


def mpi_funct_sleep(i):
from mpi4py import MPI

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
time.sleep(i)
return i, size, rank


class TestExecutorBackend(unittest.TestCase):
def test_meta_executor_serial(self):
with Executor(max_cores=2, backend="local", block_allocation=True) as exe:
Expand Down Expand Up @@ -71,3 +82,33 @@ def test_errors(self):
gpus_per_worker=1,
backend="local",
)


class TestExecutorBackendCache(unittest.TestCase):
def tearDown(self):
shutil.rmtree("./cache")

@unittest.skipIf(
skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped."
)
def test_meta_executor_parallel_cache(self):
with Executor(
max_workers=2,
cores_per_worker=2,
backend="local",
block_allocation=True,
cache_directory="./cache",
) as exe:
cloudpickle_register(ind=1)
time_1 = time.time()
fs_1 = exe.submit(mpi_funct_sleep, 1)
self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs_1.done())
time_2 = time.time()
self.assertTrue(time_2 - time_1 > 1)
time_3 = time.time()
fs_2 = exe.submit(mpi_funct_sleep, 1)
self.assertEqual(fs_2.result(), [(1, 2, 0), (1, 2, 1)])
self.assertTrue(fs_2.done())
time_4 = time.time()
self.assertTrue(time_3 - time_4 < 1)