diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index ed32d31f..503ab9fd 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -29,7 +29,7 @@ from executorlib.standalone.thread import RaisingThread try: # The PyFluxExecutor requires flux-base to be installed. - from executorlib.interactive.flux import FluxPythonSpawner + from executorlib.interactive.flux import FluxPythonSpawner, validate_max_workers except ImportError: pass @@ -226,13 +226,19 @@ def create_executor( resource_dict["flux_executor_nesting"] = flux_executor_nesting if block_allocation: resource_dict["init_function"] = init_function + max_workers = validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + cores_per_worker=cores_per_worker, + set_local_cores=False, + ) + validate_max_workers( + max_workers=max_workers, + cores=cores_per_worker, + threads_per_core=resource_dict["threads_per_core"], + ) return InteractiveExecutor( - max_workers=validate_number_of_cores( - max_cores=max_cores, - max_workers=max_workers, - cores_per_worker=cores_per_worker, - set_local_cores=False, - ), + max_workers=max_workers, executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) diff --git a/executorlib/interactive/flux.py b/executorlib/interactive/flux.py index 472a4792..fd674087 100644 --- a/executorlib/interactive/flux.py +++ b/executorlib/interactive/flux.py @@ -1,11 +1,25 @@ import os from typing import Optional +import flux import flux.job from executorlib.standalone.interactive.spawner import BaseSpawner +def validate_max_workers(max_workers, cores, threads_per_core): + handle = flux.Flux() + cores_total = flux.resource.list.resource_list(handle).get().up.ncores + cores_requested = max_workers * cores * threads_per_core + if cores_total < cores_requested: + raise ValueError( + "The number of requested cores is larger than the available cores " + + str(cores_total) + + " < " + + str(cores_requested) + ) + + class FluxPythonSpawner(BaseSpawner): """ A class representing the FluxPythonInterface. diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 8ab11569..b9d45ee7 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -113,3 +113,13 @@ def test_internal_memory(self): self.assertFalse(f.done()) self.assertEqual(f.result(), np.array([5])) self.assertTrue(f.done()) + + def test_validate_max_workers(self): + with self.assertRaises(ValueError): + Executor( + max_workers=10, + resource_dict={"cores": 10, "threads_per_core": 10}, + flux_executor=self.executor, + backend="flux_allocation", + block_allocation=True, + )