Skip to content

Commit d99245e

Browse files
Cache: create method (#459)
* Cache: create method * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update init * Import executor from full path * Remove import * update queue script * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update only test * add working directory * tests working * submission working directory * reduce example --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent de646f9 commit d99245e

File tree

7 files changed

+101
-21
lines changed

7 files changed

+101
-21
lines changed

executorlib/__init__.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,6 @@
1313
__all__ = []
1414

1515

16-
try:
17-
from executorlib.cache.executor import FileExecutor
18-
19-
__all__ += [FileExecutor]
20-
except ImportError:
21-
pass
22-
23-
2416
class Executor:
2517
"""
2618
The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or
@@ -47,6 +39,7 @@ class Executor:
4739
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
4840
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
4941
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
42+
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
5043
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
5144
context of an HPC cluster this essential to be able to communicate to an
5245
Executor running on a different compute node within the same allocation. And
@@ -95,6 +88,7 @@ def __init__(
9588
flux_executor=None,
9689
flux_executor_pmi_mode: Optional[str] = None,
9790
flux_executor_nesting: bool = False,
91+
pysqa_config_directory: Optional[str] = None,
9892
hostname_localhost: Optional[bool] = None,
9993
block_allocation: bool = True,
10094
init_function: Optional[callable] = None,
@@ -115,6 +109,7 @@ def __new__(
115109
flux_executor=None,
116110
flux_executor_pmi_mode: Optional[str] = None,
117111
flux_executor_nesting: bool = False,
112+
pysqa_config_directory: Optional[str] = None,
118113
hostname_localhost: Optional[bool] = None,
119114
block_allocation: bool = True,
120115
init_function: Optional[callable] = None,
@@ -149,6 +144,7 @@ def __new__(
149144
flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux
150145
flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only)
151146
flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function.
147+
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
152148
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
153149
context of an HPC cluster this essential to be able to communicate to an
154150
Executor running on a different compute node within the same allocation. And
@@ -180,7 +176,24 @@ def __new__(
180176
resource_dict.update(
181177
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
182178
)
183-
if not disable_dependencies:
179+
if "pysqa_" in backend and not plot_dependency_graph:
180+
from executorlib.cache.executor import create_file_executor
181+
182+
return create_file_executor(
183+
max_workers=max_workers,
184+
backend=backend,
185+
max_cores=max_cores,
186+
cache_directory=cache_directory,
187+
resource_dict=resource_dict,
188+
flux_executor=flux_executor,
189+
flux_executor_pmi_mode=flux_executor_pmi_mode,
190+
flux_executor_nesting=flux_executor_nesting,
191+
pysqa_config_directory=pysqa_config_directory,
192+
hostname_localhost=hostname_localhost,
193+
block_allocation=block_allocation,
194+
init_function=init_function,
195+
)
196+
elif not disable_dependencies:
184197
return ExecutorWithDependencies(
185198
max_workers=max_workers,
186199
backend=backend,

executorlib/cache/executor.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
execute_in_subprocess,
88
terminate_subprocess,
99
)
10+
from executorlib.standalone.inputcheck import (
11+
check_executor,
12+
check_nested_flux_executor,
13+
)
1014
from executorlib.standalone.thread import RaisingThread
1115

1216
try:
@@ -23,7 +27,7 @@ def __init__(
2327
resource_dict: Optional[dict] = None,
2428
execute_function: callable = execute_with_pysqa,
2529
terminate_function: Optional[callable] = None,
26-
config_directory: Optional[str] = None,
30+
pysqa_config_directory: Optional[str] = None,
2731
backend: Optional[str] = None,
2832
):
2933
"""
@@ -36,7 +40,7 @@ def __init__(
3640
- cwd (str/None): current working directory where the parallel python task is executed
3741
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
3842
terminate_function (callable, optional): The function to terminate the tasks.
39-
config_directory (str, optional): path to the config directory.
43+
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
4044
backend (str, optional): name of the backend used to spawn tasks.
4145
"""
4246
super().__init__()
@@ -62,8 +66,58 @@ def __init__(
6266
"cache_directory": cache_directory_path,
6367
"resource_dict": resource_dict,
6468
"terminate_function": terminate_function,
65-
"config_directory": config_directory,
69+
"pysqa_config_directory": pysqa_config_directory,
6670
"backend": backend,
6771
},
6872
)
6973
)
74+
75+
76+
def create_file_executor(
77+
max_workers: int = 1,
78+
backend: str = "pysqa_flux",
79+
max_cores: int = 1,
80+
cache_directory: Optional[str] = None,
81+
resource_dict: Optional[dict] = None,
82+
flux_executor=None,
83+
flux_executor_pmi_mode: Optional[str] = None,
84+
flux_executor_nesting: bool = False,
85+
pysqa_config_directory: Optional[str] = None,
86+
hostname_localhost: Optional[bool] = None,
87+
block_allocation: bool = False,
88+
init_function: Optional[callable] = None,
89+
):
90+
if cache_directory is None:
91+
cache_directory = "executorlib_cache"
92+
if max_workers != 1:
93+
raise ValueError(
94+
"The number of workers cannot be controlled with the pysqa based backend."
95+
)
96+
if max_cores != 1:
97+
raise ValueError(
98+
"The number of cores cannot be controlled with the pysqa based backend."
99+
)
100+
if hostname_localhost is not None:
101+
raise ValueError(
102+
"The option to connect to hosts based on their hostname is not available with the pysqa based backend."
103+
)
104+
if block_allocation:
105+
raise ValueError(
106+
"The option block_allocation is not available with the pysqa based backend."
107+
)
108+
if init_function is not None:
109+
raise ValueError(
110+
"The option to specify an init_function is not available with the pysqa based backend."
111+
)
112+
if flux_executor_pmi_mode is not None:
113+
raise ValueError(
114+
"The option to specify the flux pmi mode is not available with the pysqa based backend."
115+
)
116+
check_executor(executor=flux_executor)
117+
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
118+
return FileExecutor(
119+
cache_directory=cache_directory,
120+
resource_dict=resource_dict,
121+
pysqa_config_directory=pysqa_config_directory,
122+
backend=backend.split("pysqa_")[-1],
123+
)

executorlib/cache/shared.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def execute_tasks_h5(
5252
execute_function: callable,
5353
resource_dict: dict,
5454
terminate_function: Optional[callable] = None,
55-
config_directory: Optional[str] = None,
55+
pysqa_config_directory: Optional[str] = None,
5656
backend: Optional[str] = None,
5757
) -> None:
5858
"""
@@ -66,7 +66,7 @@ def execute_tasks_h5(
6666
- cwd (str/None): current working directory where the parallel python task is executed
6767
execute_function (callable): The function to execute the tasks.
6868
terminate_function (callable): The function to terminate the tasks.
69-
config_directory (str, optional): path to the config directory.
69+
pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
7070
backend (str, optional): name of the backend used to spawn tasks.
7171
7272
Returns:
@@ -120,7 +120,7 @@ def execute_tasks_h5(
120120
process_dict[k] for k in future_wait_key_lst
121121
],
122122
resource_dict=task_resource_dict,
123-
config_directory=config_directory,
123+
config_directory=pysqa_config_directory,
124124
backend=backend,
125125
)
126126
file_name_dict[task_key] = os.path.join(

executorlib/standalone/cache/queue.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,14 @@ def execute_with_pysqa(
3535
"working_directory": resource_dict["cwd"],
3636
}
3737
del resource_dict["cwd"]
38+
unsupported_keys = [
39+
"threads_per_core",
40+
"gpus_per_core",
41+
"openmpi_oversubscribe",
42+
"slurm_cmd_args",
43+
]
44+
for k in unsupported_keys:
45+
if k in resource_dict:
46+
del resource_dict[k]
3847
submit_kwargs.update(resource_dict)
3948
return qa.submit_job(**submit_kwargs)

tests/test_cache_executor_mpi.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88

99
try:
10-
from executorlib import FileExecutor
10+
from executorlib.cache.executor import FileExecutor
1111

1212
skip_h5py_test = False
1313
except ImportError:

tests/test_cache_executor_pysqa_flux.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import unittest
44
import shutil
55

6+
from executorlib import Executor
7+
from executorlib.standalone.serialize import cloudpickle_register
8+
69
try:
710
import flux.job
8-
from executorlib import FileExecutor
911

1012
skip_flux_test = "FLUX_URI" not in os.environ
1113
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
@@ -30,10 +32,12 @@ def mpi_funct(i):
3032
)
3133
class TestCacheExecutorPysqa(unittest.TestCase):
3234
def test_executor(self):
33-
with FileExecutor(
34-
resource_dict={"cores": 2},
35-
backend="flux",
35+
with Executor(
36+
backend="pysqa_flux",
37+
resource_dict={"cores": 2, "cwd": os.path.abspath("cwd")},
38+
block_allocation=False,
3639
) as exe:
40+
cloudpickle_register(ind=1)
3741
fs1 = exe.submit(mpi_funct, 1)
3842
self.assertFalse(fs1.done())
3943
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])

tests/test_cache_executor_serial.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from executorlib.standalone.thread import RaisingThread
1212

1313
try:
14-
from executorlib import FileExecutor
14+
from executorlib.cache.executor import FileExecutor
1515
from executorlib.cache.shared import execute_tasks_h5
1616

1717
skip_h5py_test = False

0 commit comments

Comments
 (0)