Skip to content

Commit 6c139bb

Browse files
committed
Implement resource_dict for file executor
1 parent ceda4c0 commit 6c139bb

File tree

7 files changed

+35
-26
lines changed

7 files changed

+35
-26
lines changed

executorlib/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,9 @@ def __new__(
138138
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
139139
max_cores (int): defines the number cores which can be used in parallel
140140
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
141-
- cores_per_worker (int): number of MPI cores to be used for each function call
141+
- cores (int): number of MPI cores to be used for each function call
142142
- threads_per_core (int): number of OpenMP threads to be used for each function call
143-
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
143+
- gpus_per_core (int): number of GPUs per worker - defaults to 0
144144
- cwd (str/None): current working directory where the parallel python task is executed
145145
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI
146146
and SLURM only) - default False

executorlib/cache/executor.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ class FileExecutor(ExecutorBase):
1414
def __init__(
1515
self,
1616
cache_directory: str = "cache",
17-
cores_per_worker: int = 1,
18-
cwd: Optional[str] = None,
17+
resource_dict: Optional[dict] = None,
1918
execute_function: callable = execute_in_subprocess,
2019
terminate_function: Optional[callable] = None,
2120
config_directory: Optional[str] = None,
@@ -26,6 +25,9 @@ def __init__(
2625
2726
Args:
2827
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
28+
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
29+
- cores (int): number of MPI cores to be used for each function call
30+
- cwd (str/None): current working directory where the parallel python task is executed
2931
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
3032
cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
3133
terminate_function (callable, optional): The function to terminate the tasks.
@@ -34,6 +36,15 @@ def __init__(
3436
backend (str, optional): name of the backend used to spawn tasks.
3537
"""
3638
super().__init__()
39+
default_resource_dict = {
40+
"cores": 1,
41+
"cwd": None,
42+
}
43+
if resource_dict is None:
44+
resource_dict = {}
45+
resource_dict.update(
46+
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
47+
)
3748
if execute_function == execute_in_subprocess and terminate_function is None:
3849
terminate_function = terminate_subprocess
3950
cache_directory_path = os.path.abspath(cache_directory)
@@ -45,8 +56,7 @@ def __init__(
4556
"future_queue": self._future_queue,
4657
"execute_function": execute_function,
4758
"cache_directory": cache_directory_path,
48-
"cores_per_worker": cores_per_worker,
49-
"cwd": cwd,
59+
"resource_dict": resource_dict,
5060
"terminate_function": terminate_function,
5161
"config_directory": config_directory,
5262
"backend": backend,

executorlib/cache/shared.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ def execute_tasks_h5(
5050
future_queue: queue.Queue,
5151
cache_directory: str,
5252
execute_function: callable,
53-
cores_per_worker: int = 1,
54-
cwd: Optional[str] = None,
53+
resource_dict: dict,
5554
terminate_function: Optional[callable] = None,
5655
config_directory: Optional[str] = None,
5756
backend: Optional[str] = None,
@@ -62,9 +61,10 @@ def execute_tasks_h5(
6261
Args:
6362
future_queue (queue.Queue): The queue containing the tasks.
6463
cache_directory (str): The directory to store the HDF5 files.
65-
cores_per_worker (int): The number of cores per worker.
64+
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
65+
- cores (int): number of MPI cores to be used for each function call
66+
- cwd (str/None): current working directory where the parallel python task is executed
6667
execute_function (callable): The function to execute the tasks.
67-
cwd (str/None): current working directory where the parallel python task is executed
6868
terminate_function (callable): The function to terminate the tasks.
6969
config_directory (str, optional): path to the config directory.
7070
backend (str, optional): name of the backend used to spawn tasks.
@@ -97,16 +97,15 @@ def execute_tasks_h5(
9797
memory_dict=memory_dict,
9898
file_name_dict=file_name_dict,
9999
)
100-
resource_dict = task_dict["resource_dict"].copy()
101-
if "cores" not in resource_dict:
102-
resource_dict["cores"] = cores_per_worker
103-
if "cwd" not in resource_dict:
104-
resource_dict["cwd"] = cwd
100+
task_resource_dict = task_dict["resource_dict"].copy()
101+
task_resource_dict.update(
102+
{k: v for k, v in resource_dict.items() if k not in task_resource_dict}
103+
)
105104
task_key, data_dict = serialize_funct_h5(
106105
fn=task_dict["fn"],
107106
fn_args=task_args,
108107
fn_kwargs=task_kwargs,
109-
resource_dict=resource_dict,
108+
resource_dict=task_resource_dict,
110109
)
111110
if task_key not in memory_dict.keys():
112111
if task_key + ".h5out" not in os.listdir(cache_directory):
@@ -115,12 +114,12 @@ def execute_tasks_h5(
115114
process_dict[task_key] = execute_function(
116115
command=_get_execute_command(
117116
file_name=file_name,
118-
cores=cores_per_worker,
117+
cores=task_resource_dict["cores"],
119118
),
120119
task_dependent_lst=[
121120
process_dict[k] for k in future_wait_key_lst
122121
],
123-
resource_dict=resource_dict,
122+
resource_dict=task_resource_dict,
124123
config_directory=config_directory,
125124
backend=backend,
126125
)

executorlib/interactive/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,9 @@ def create_executor(
175175
max_cores (int): defines the number cores which can be used in parallel
176176
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
177177
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
178-
- cores_per_worker (int): number of MPI cores to be used for each function call
178+
- cores (int): number of MPI cores to be used for each function call
179179
- threads_per_core (int): number of OpenMP threads to be used for each function call
180-
- gpus_per_worker (int): number of GPUs per worker - defaults to 0
180+
- gpus_per_core (int): number of GPUs per worker - defaults to 0
181181
- cwd (str/None): current working directory where the parallel python task is executed
182182
- openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and
183183
SLURM only) - default False

tests/test_cache_executor_mpi.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def mpi_funct(i):
2929
)
3030
class TestCacheExecutorMPI(unittest.TestCase):
3131
def test_executor(self):
32-
with FileExecutor(cores_per_worker=2) as exe:
32+
with FileExecutor(resource_dict={"cores": 2}) as exe:
3333
fs1 = exe.submit(mpi_funct, 1)
3434
self.assertFalse(fs1.done())
3535
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])

tests/test_cache_executor_pysqa_flux.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def mpi_funct(i):
3232
class TestCacheExecutorPysqa(unittest.TestCase):
3333
def test_executor(self):
3434
with FileExecutor(
35-
cores_per_worker=2,
35+
resource_dict={"cores": 2},
3636
execute_function=execute_with_pysqa,
3737
backend="flux",
3838
) as exe:

tests/test_cache_executor_serial.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def test_executor_dependence_mixed(self):
4848

4949
def test_executor_working_directory(self):
5050
cwd = os.path.join(os.path.dirname(__file__), "executables")
51-
with FileExecutor(cwd=cwd) as exe:
51+
with FileExecutor(resource_dict={"cwd": cwd}) as exe:
5252
fs1 = exe.submit(list_files_in_working_directory)
5353
self.assertEqual(fs1.result(), os.listdir(cwd))
5454

@@ -72,7 +72,7 @@ def test_executor_function(self):
7272
"future_queue": q,
7373
"cache_directory": cache_dir,
7474
"execute_function": execute_in_subprocess,
75-
"cores_per_worker": 1,
75+
"resource_dict": {"cores": 1, "cwd": None},
7676
"terminate_function": terminate_subprocess,
7777
},
7878
)
@@ -113,7 +113,7 @@ def test_executor_function_dependence_kwargs(self):
113113
"future_queue": q,
114114
"cache_directory": cache_dir,
115115
"execute_function": execute_in_subprocess,
116-
"cores_per_worker": 1,
116+
"resource_dict": {"cores": 1, "cwd": None},
117117
"terminate_function": terminate_subprocess,
118118
},
119119
)
@@ -154,7 +154,7 @@ def test_executor_function_dependence_args(self):
154154
"future_queue": q,
155155
"cache_directory": cache_dir,
156156
"execute_function": execute_in_subprocess,
157-
"cores_per_worker": 1,
157+
"resource_dict": {"cores": 1, "cwd": None},
158158
"terminate_function": terminate_subprocess,
159159
},
160160
)

0 commit comments

Comments
 (0)