Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
check_nested_flux_executor,
check_oversubscribe,
check_pmi,
check_threads_per_core,
validate_number_of_cores,
)
from executorlib.standalone.interactive.spawner import (
Expand Down Expand Up @@ -258,7 +257,6 @@ def create_executor(
elif backend == "local":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_threads_per_core(threads_per_core=resource_dict["threads_per_core"])
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
Expand Down
5 changes: 3 additions & 2 deletions executorlib/interactive/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,14 @@ def _submit_function_to_separate_process(
resource_dict["cores"] == 1 and executor_kwargs["cores"] >= 1
):
resource_dict["cores"] = executor_kwargs["cores"]
slots_required = resource_dict["cores"] * resource_dict.get("threads_per_core", 1)
active_task_dict = _wait_for_free_slots(
active_task_dict=active_task_dict,
cores_requested=resource_dict["cores"],
cores_requested=slots_required,
max_cores=max_cores,
max_workers=max_workers,
)
active_task_dict[task_dict["future"]] = resource_dict["cores"]
active_task_dict[task_dict["future"]] = slots_required
task_kwargs = executor_kwargs.copy()
task_kwargs.update(resource_dict)
task_kwargs.update(
Expand Down
13 changes: 0 additions & 13 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,6 @@ def check_gpus_per_worker(gpus_per_worker: int) -> None:
)


def check_threads_per_core(threads_per_core: int) -> None:
"""
Check if threads_per_core is not 1 and raise a TypeError if it is.
"""
if threads_per_core != 1:
raise TypeError(
"Thread based parallelism is not supported for the executorlib.mpi.PyMPIExecutor backend."
"Please use threads_per_core=1 instead of threads_per_core="
+ str(threads_per_core)
+ "."
)


def check_executor(executor: Executor) -> None:
"""
Check if executor is not None and raise a ValueError if it is.
Expand Down
5 changes: 4 additions & 1 deletion executorlib/standalone/interactive/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ def __init__(
cwd: Optional[str] = None,
cores: int = 1,
openmpi_oversubscribe: bool = False,
threads_per_core: int = 1,
):
"""
Subprocess interface implementation.

Args:
cwd (str, optional): The current working directory. Defaults to None.
cores (int, optional): The number of cores to use. Defaults to 1.
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
"""
super().__init__(
Expand All @@ -72,6 +74,7 @@ def __init__(
openmpi_oversubscribe=openmpi_oversubscribe,
)
self._process = None
self._threads_per_core = threads_per_core

def bootup(
self,
Expand Down Expand Up @@ -169,8 +172,8 @@ def __init__(
cwd=cwd,
cores=cores,
openmpi_oversubscribe=openmpi_oversubscribe,
threads_per_core=threads_per_core,
)
self._threads_per_core = threads_per_core
self._gpus_per_core = gpus_per_core
self._slurm_cmd_args = slurm_cmd_args

Expand Down
6 changes: 0 additions & 6 deletions tests/test_executor_backend_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ def test_meta_executor_parallel(self):
self.assertTrue(fs_1.done())

def test_errors(self):
with self.assertRaises(TypeError):
Executor(
max_cores=1,
resource_dict={"cores": 1, "threads_per_core": 2},
backend="local",
)
with self.assertRaises(TypeError):
Executor(
max_cores=1,
Expand Down
9 changes: 0 additions & 9 deletions tests/test_executor_backend_mpi_noblock.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,6 @@ def test_meta_executor_single(self):
self.assertTrue(fs_2.done())

def test_errors(self):
with self.assertRaises(TypeError):
Executor(
max_cores=1,
resource_dict={
"cores": 1,
"threads_per_core": 2,
},
backend="local",
)
with self.assertRaises(TypeError):
Executor(
max_cores=1,
Expand Down
5 changes: 0 additions & 5 deletions tests/test_shared_input_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_gpus_per_worker,
check_threads_per_core,
check_oversubscribe,
check_executor,
check_init_function,
Expand Down Expand Up @@ -31,10 +30,6 @@ def test_check_gpus_per_worker(self):
with self.assertRaises(TypeError):
check_gpus_per_worker(gpus_per_worker=1)

def test_check_threads_per_core(self):
with self.assertRaises(TypeError):
check_threads_per_core(threads_per_core=2)

def test_check_oversubscribe(self):
with self.assertRaises(ValueError):
check_oversubscribe(oversubscribe=True)
Expand Down
Loading