diff --git a/executorlib/base/executor.py b/executorlib/base/executor.py index 29d63c5b..ff7ed4df 100644 --- a/executorlib/base/executor.py +++ b/executorlib/base/executor.py @@ -7,10 +7,7 @@ ) from typing import Optional -from executorlib.standalone.inputcheck import ( - check_resource_dict, - check_resource_dict_is_empty, -) +from executorlib.standalone.inputcheck import check_resource_dict from executorlib.standalone.queue import cancel_items_in_queue from executorlib.standalone.serialize import cloudpickle_register from executorlib.standalone.thread import RaisingThread @@ -89,10 +86,17 @@ def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Fut Returns: Future: A Future representing the given call. """ - check_resource_dict_is_empty(resource_dict=resource_dict) check_resource_dict(function=fn) f = Future() - self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) + self._future_queue.put( + { + "fn": fn, + "args": args, + "kwargs": kwargs, + "future": f, + "resource_dict": resource_dict, + } + ) return f def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index ffe8a95b..5bb8764e 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -94,7 +94,10 @@ def execute_tasks_h5( file_name_dict=file_name_dict, ) task_key, data_dict = serialize_funct_h5( - task_dict["fn"], *task_args, **task_kwargs + fn=task_dict["fn"], + fn_args=task_args, + fn_kwargs=task_kwargs, + resource_dict=task_dict["resource_dict"], ) if task_key not in memory_dict.keys(): if task_key + ".h5out" not in os.listdir(cache_directory): diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 1f5237c6..dd672e2a 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -1,8 +1,8 @@ from concurrent.futures import Future from typing import Any, Callable, Dict, Optional +from executorlib.base.executor import ExecutorBase from executorlib.interactive.shared import ( - ExecutorSteps, InteractiveExecutor, InteractiveStepExecutor, execute_tasks_with_dependencies, @@ -35,10 +35,10 @@ pass -class ExecutorWithDependencies(ExecutorSteps): +class ExecutorWithDependencies(ExecutorBase): """ - ExecutorWithDependencies is a class that extends ExecutorSteps and provides - functionality for executing tasks with dependencies. + ExecutorWithDependencies is a class that extends ExecutorBase and provides functionality for executing tasks with + dependencies. Args: refresh_rate (float, optional): The refresh rate for updating the executor queue. Defaults to 0.01. diff --git a/executorlib/interactive/shared.py b/executorlib/interactive/shared.py index dd2f9ff2..5136a63b 100644 --- a/executorlib/interactive/shared.py +++ b/executorlib/interactive/shared.py @@ -8,7 +8,10 @@ from executorlib.base.executor import ExecutorBase, cancel_items_in_queue from executorlib.standalone.command import get_command_path -from executorlib.standalone.inputcheck import check_resource_dict +from executorlib.standalone.inputcheck import ( + check_resource_dict, + check_resource_dict_is_empty, +) from executorlib.standalone.interactive.communication import ( SocketInterface, interface_bootup, @@ -19,6 +22,37 @@ class ExecutorBroker(ExecutorBase): + def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs) -> Future: + """ + Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Args: + fn (callable): function to submit for execution + args: arguments for the submitted function + kwargs: keyword arguments for the submitted function + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the + function. Example resource dictionary: { + cores: 1, + threads_per_core: 1, + gpus_per_worker: 0, + oversubscribe: False, + cwd: None, + executor: None, + hostname_localhost: False, + } + + Returns: + Future: A Future representing the given call. + """ + check_resource_dict_is_empty(resource_dict=resource_dict) + check_resource_dict(function=fn) + f = Future() + self._future_queue.put({"fn": fn, "args": args, "kwargs": kwargs, "future": f}) + return f + def shutdown(self, wait: bool = True, *, cancel_futures: bool = False): """Clean-up the resources associated with the Executor. @@ -57,46 +91,6 @@ def _set_process(self, process: List[RaisingThread]): process.start() -class ExecutorSteps(ExecutorBase): - def submit(self, fn: callable, *args, resource_dict: dict = {}, **kwargs): - """ - Submits a callable to be executed with the given arguments. - - Schedules the callable to be executed as fn(*args, **kwargs) and returns - a Future instance representing the execution of the callable. - - Args: - fn (callable): function to submit for execution - args: arguments for the submitted function - kwargs: keyword arguments for the submitted function - resource_dict (dict): resource dictionary, which defines the resources used for the execution of the - function. Example resource dictionary: { - cores: 1, - threads_per_core: 1, - gpus_per_worker: 0, - oversubscribe: False, - cwd: None, - executor: None, - hostname_localhost: False, - } - - Returns: - A Future representing the given call. - """ - check_resource_dict(function=fn) - f = Future() - self._future_queue.put( - { - "fn": fn, - "args": args, - "kwargs": kwargs, - "future": f, - "resource_dict": resource_dict, - } - ) - return f - - class InteractiveExecutor(ExecutorBroker): """ The executorlib.interactive.executor.InteractiveExecutor leverages the exeutorlib interfaces to distribute python @@ -151,7 +145,7 @@ def __init__( ) -class InteractiveStepExecutor(ExecutorSteps): +class InteractiveStepExecutor(ExecutorBase): """ The executorlib.interactive.executor.InteractiveStepExecutor leverages the executorlib interfaces to distribute python tasks. In contrast to the mpi4py.futures.MPIPoolExecutor the executorlib.interactive.executor.InteractiveStepExecutor @@ -596,7 +590,10 @@ def _execute_task_with_cache( future = task_dict["future"] task_key, data_dict = serialize_funct_h5( - task_dict["fn"], *task_dict["args"], **task_dict["kwargs"] + fn=task_dict["fn"], + fn_args=task_dict["args"], + fn_kwargs=task_dict["kwargs"], + resource_dict=task_dict["resource_dict"], ) os.makedirs(cache_directory, exist_ok=True) file_name = os.path.join(cache_directory, task_key + ".h5out") diff --git a/executorlib/standalone/serialize.py b/executorlib/standalone/serialize.py index 1afc763f..74bb1729 100644 --- a/executorlib/standalone/serialize.py +++ b/executorlib/standalone/serialize.py @@ -28,22 +28,41 @@ def cloudpickle_register(ind: int = 2): pass -def serialize_funct_h5(fn: callable, *args: Any, **kwargs: Any) -> Tuple[str, dict]: +def serialize_funct_h5( + fn: callable, fn_args: list = [], fn_kwargs: dict = {}, resource_dict: dict = {} +) -> Tuple[str, dict]: """ Serialize a function and its arguments and keyword arguments into an HDF5 file. Args: fn (callable): The function to be serialized. - *args (Any): The arguments of the function. - **kwargs (Any): The keyword arguments of the function. + fn_args (list): The arguments of the function. + fn_kwargs (dict): The keyword arguments of the function. + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. + Example resource dictionary: { + cores: 1, + threads_per_core: 1, + gpus_per_worker: 0, + oversubscribe: False, + cwd: None, + executor: None, + hostname_localhost: False, + } Returns: Tuple[str, dict]: A tuple containing the task key and the serialized data. """ - binary_all = cloudpickle.dumps({"fn": fn, "args": args, "kwargs": kwargs}) + binary_all = cloudpickle.dumps( + {"fn": fn, "args": fn_args, "kwargs": fn_kwargs, "resource_dict": resource_dict} + ) task_key = fn.__name__ + _get_hash(binary=binary_all) - data = {"fn": fn, "args": args, "kwargs": kwargs} + data = { + "fn": fn, + "args": fn_args, + "kwargs": fn_kwargs, + "resource_dict": resource_dict, + } return task_key, data diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index a4d453f7..7745f572 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -55,7 +55,15 @@ def test_executor_working_directory(self): def test_executor_function(self): fs1 = Future() q = Queue() - q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": 2}, "future": fs1}) + q.put( + { + "fn": my_funct, + "args": (), + "kwargs": {"a": 1, "b": 2}, + "future": fs1, + "resource_dict": {}, + } + ) cache_dir = os.path.abspath("cache") os.makedirs(cache_dir, exist_ok=True) process = RaisingThread( @@ -80,8 +88,24 @@ def test_executor_function_dependence_kwargs(self): fs1 = Future() fs2 = Future() q = Queue() - q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": 2}, "future": fs1}) - q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": fs1}, "future": fs2}) + q.put( + { + "fn": my_funct, + "args": (), + "kwargs": {"a": 1, "b": 2}, + "future": fs1, + "resource_dict": {}, + } + ) + q.put( + { + "fn": my_funct, + "args": (), + "kwargs": {"a": 1, "b": fs1}, + "future": fs2, + "resource_dict": {}, + } + ) cache_dir = os.path.abspath("cache") os.makedirs(cache_dir, exist_ok=True) process = RaisingThread( @@ -106,8 +130,24 @@ def test_executor_function_dependence_args(self): fs1 = Future() fs2 = Future() q = Queue() - q.put({"fn": my_funct, "args": (), "kwargs": {"a": 1, "b": 2}, "future": fs1}) - q.put({"fn": my_funct, "args": [fs1], "kwargs": {"b": 2}, "future": fs2}) + q.put( + { + "fn": my_funct, + "args": (), + "kwargs": {"a": 1, "b": 2}, + "future": fs1, + "resource_dict": {}, + } + ) + q.put( + { + "fn": my_funct, + "args": [fs1], + "kwargs": {"b": 2}, + "future": fs2, + "resource_dict": {}, + } + ) cache_dir = os.path.abspath("cache") os.makedirs(cache_dir, exist_ok=True) process = RaisingThread( diff --git a/tests/test_cache_shared.py b/tests/test_cache_shared.py index e1c26ca9..544eb73f 100644 --- a/tests/test_cache_shared.py +++ b/tests/test_cache_shared.py @@ -27,9 +27,9 @@ def test_execute_function_mixed(self): cache_directory = os.path.abspath("cache") os.makedirs(cache_directory, exist_ok=True) task_key, data_dict = serialize_funct_h5( - my_funct, - 1, - b=2, + fn=my_funct, + fn_args=[1], + fn_kwargs={"b": 2}, ) file_name = os.path.join(cache_directory, task_key + ".h5in") dump(file_name=file_name, data_dict=data_dict) @@ -50,9 +50,9 @@ def test_execute_function_args(self): cache_directory = os.path.abspath("cache") os.makedirs(cache_directory, exist_ok=True) task_key, data_dict = serialize_funct_h5( - my_funct, - 1, - 2, + fn=my_funct, + fn_args=[1, 2], + fn_kwargs={}, ) file_name = os.path.join(cache_directory, task_key + ".h5in") dump(file_name=file_name, data_dict=data_dict) @@ -73,9 +73,9 @@ def test_execute_function_kwargs(self): cache_directory = os.path.abspath("cache") os.makedirs(cache_directory, exist_ok=True) task_key, data_dict = serialize_funct_h5( - my_funct, - a=1, - b=2, + fn=my_funct, + fn_args=[], + fn_kwargs={"a": 1, "b": 2}, ) file_name = os.path.join(cache_directory, task_key + ".h5in") dump(file_name=file_name, data_dict=data_dict)