Skip to content

Commit 09c15f1

Browse files
Cache: Add queue test (#452)
* Cache: Add queue test * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix mpich bug --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent df5e582 commit 09c15f1

File tree

10 files changed

+114
-6
lines changed

10 files changed

+114
-6
lines changed

.ci_support/environment-mpich.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ dependencies:
1212
- networkx =3.4.2
1313
- pygraphviz =1.14
1414
- ipython =8.29.0
15+
- pysqa =0.2.0

.ci_support/environment-openmpi.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ dependencies:
1111
- matplotlib =3.9.2
1212
- networkx =3.4.2
1313
- pygraphviz =1.14
14+
- pysqa =0.2.0
1415
- ipython =8.29.0

.github/workflows/unittest-flux-mpich.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,4 @@ jobs:
3434
timeout-minutes: 5
3535
run: >
3636
flux start
37-
python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
37+
python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py;

.github/workflows/unittest-flux-openmpi.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
timeout-minutes: 5
3535
run: >
3636
flux start
37-
coverage run -a --omit="executorlib/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py;
37+
coverage run -a --omit="executorlib/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py;
3838
coverage xml
3939
env:
4040
PYMPIPOOL_PMIX: "pmix"

executorlib/cache/executor.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ def __init__(
1818
cwd: Optional[str] = None,
1919
execute_function: callable = execute_in_subprocess,
2020
terminate_function: Optional[callable] = None,
21+
config_directory: Optional[str] = None,
22+
backend: Optional[str] = None,
2123
):
2224
"""
2325
Initialize the FileExecutor.
@@ -27,7 +29,9 @@ def __init__(
2729
execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
2830
cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
2931
terminate_function (callable, optional): The function to terminate the tasks.
30-
cwd (str/None): current working directory where the parallel python task is executed
32+
cwd (str, optional): current working directory where the parallel python task is executed
33+
config_directory (str, optional): path to the config directory.
34+
backend (str, optional): name of the backend used to spawn tasks.
3135
"""
3236
super().__init__()
3337
if execute_function == execute_in_subprocess and terminate_function is None:
@@ -44,6 +48,8 @@ def __init__(
4448
"cores_per_worker": cores_per_worker,
4549
"cwd": cwd,
4650
"terminate_function": terminate_function,
51+
"config_directory": config_directory,
52+
"backend": backend,
4753
},
4854
)
4955
)

executorlib/cache/shared.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ def execute_tasks_h5(
5353
cores_per_worker: int = 1,
5454
cwd: Optional[str] = None,
5555
terminate_function: Optional[callable] = None,
56+
config_directory: Optional[str] = None,
57+
backend: Optional[str] = None,
5658
) -> None:
5759
"""
5860
Execute tasks stored in a queue using HDF5 files.
@@ -64,6 +66,8 @@ def execute_tasks_h5(
6466
execute_function (callable): The function to execute the tasks.
6567
cwd (str/None): current working directory where the parallel python task is executed
6668
terminate_function (callable): The function to terminate the tasks.
69+
config_directory (str, optional): path to the config directory.
70+
backend (str, optional): name of the backend used to spawn tasks.
6771
6872
Returns:
6973
None
@@ -117,6 +121,8 @@ def execute_tasks_h5(
117121
process_dict[k] for k in future_wait_key_lst
118122
],
119123
resource_dict=resource_dict,
124+
config_directory=config_directory,
125+
backend=backend,
120126
)
121127
file_name_dict[task_key] = os.path.join(
122128
cache_directory, task_key + ".h5out"
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from typing import List, Optional
2+
3+
from pysqa import QueueAdapter
4+
5+
6+
def execute_with_pysqa(
7+
command: str,
8+
resource_dict: dict,
9+
task_dependent_lst: List[int] = [],
10+
config_directory: Optional[str] = None,
11+
backend: Optional[str] = None,
12+
) -> int:
13+
"""
14+
Execute a command by submitting it to the queuing system
15+
16+
Args:
17+
command (list): The command to be executed.
18+
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
19+
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
20+
Example resource dictionary: {
21+
cwd: None,
22+
}
23+
config_directory (str, optional): path to the config directory.
24+
backend (str, optional): name of the backend used to spawn tasks.
25+
26+
Returns:
27+
int: queuing system ID
28+
"""
29+
if resource_dict is None:
30+
resource_dict = {"cwd": "."}
31+
qa = QueueAdapter(directory=config_directory, queue_type=backend)
32+
submit_kwargs = {
33+
"command": " ".join(command),
34+
"dependency_list": [str(qid) for qid in task_dependent_lst],
35+
"working_directory": resource_dict["cwd"],
36+
}
37+
del resource_dict["cwd"]
38+
submit_kwargs.update(resource_dict)
39+
return qa.submit_job(**submit_kwargs)

executorlib/standalone/cache/spawner.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ def execute_in_subprocess(
77
command: list,
88
task_dependent_lst: list = [],
99
resource_dict: Optional[dict] = None,
10+
config_directory: Optional[str] = None,
11+
backend: Optional[str] = None,
1012
) -> subprocess.Popen:
1113
"""
1214
Execute a command in a subprocess.
@@ -18,6 +20,8 @@ def execute_in_subprocess(
1820
Example resource dictionary: {
1921
cwd: None,
2022
}
23+
config_directory (str, optional): path to the config directory.
24+
backend (str, optional): name of the backend used to spawn tasks.
2125
2226
Returns:
2327
subprocess.Popen: The subprocess object.
@@ -27,6 +31,12 @@ def execute_in_subprocess(
2731
task_dependent_lst = [
2832
task for task in task_dependent_lst if task.poll() is None
2933
]
34+
if config_directory is not None:
35+
raise ValueError(
36+
"config_directory parameter is not supported for subprocess spawner."
37+
)
38+
if backend is not None:
39+
raise ValueError("backend parameter is not supported for subprocess spawner.")
3040
if resource_dict is None:
3141
resource_dict = {"cwd": None}
3242
elif len(resource_dict) == 0:

pyproject.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,14 @@ Repository = "https://github.com/pyiron/executorlib"
3636

3737
[project.optional-dependencies]
3838
mpi = ["mpi4py==4.0.1"]
39-
hdf = [
40-
"h5py==3.12.1",
41-
]
39+
hdf = ["h5py==3.12.1"]
4240
graph = [
4341
"pygraphviz==1.14",
4442
"matplotlib==3.9.2",
4543
"networkx==3.4.2",
4644
"ipython==8.29.0",
4745
]
46+
queue = ["pysqa==0.2.0"]
4847

4948
[tool.setuptools.packages.find]
5049
include = ["executorlib*"]
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import os
2+
import importlib
3+
import unittest
4+
import shutil
5+
6+
try:
7+
import flux.job
8+
from executorlib import FileExecutor
9+
from executorlib.standalone.cache.queue import execute_with_pysqa
10+
11+
skip_flux_test = "FLUX_URI" not in os.environ
12+
pmi = os.environ.get("PYMPIPOOL_PMIX", None)
13+
except ImportError:
14+
skip_flux_test = True
15+
16+
17+
skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None
18+
19+
20+
def mpi_funct(i):
21+
from mpi4py import MPI
22+
23+
size = MPI.COMM_WORLD.Get_size()
24+
rank = MPI.COMM_WORLD.Get_rank()
25+
return i, size, rank
26+
27+
28+
@unittest.skipIf(
29+
skip_flux_test or skip_mpi4py_test,
30+
"h5py or mpi4py or flux are not installed, so the h5py, flux and mpi4py tests are skipped.",
31+
)
32+
class TestCacheExecutorPysqa(unittest.TestCase):
33+
def test_executor(self):
34+
with FileExecutor(
35+
cores_per_worker=2,
36+
execute_function=execute_with_pysqa,
37+
backend="flux",
38+
) as exe:
39+
fs1 = exe.submit(mpi_funct, 1)
40+
self.assertFalse(fs1.done())
41+
self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)])
42+
self.assertTrue(fs1.done())
43+
44+
def tearDown(self):
45+
if os.path.exists("cache"):
46+
shutil.rmtree("cache")

0 commit comments

Comments
 (0)