-
Notifications
You must be signed in to change notification settings - Fork 3
Closed
Description
The SlurmClusterExecutor and FluxClusterExecutor behave fundamentally different from the SingleNodeExecutor as explained in the documentation: https://executorlib.readthedocs.io/en/latest/4-developer.html#interface-class-hierarchy
This makes it hard to locally debug their functionality. Still internally executorlib contains the functionality to build a SingleNodeExecutor which behaves similar to the SlurmClusterExecutor and FluxClusterExecutor. This should be explained in the documentation:
import os
import time
from typing import Optional, Callable
from executorlib.task_scheduler.file.subprocess_spawner import execute_in_subprocess
from executorlib.task_scheduler.file.task_scheduler import FileTaskScheduler
from executorlib.executor.base import BaseExecutor
class LocalFileExecutor(BaseExecutor):
def __init__(
self,
max_workers: Optional[int] = None,
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
):
default_resource_dict: dict = {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
"slurm_cmd_args": [],
}
if cache_directory is None:
default_resource_dict["cache_directory"] = "executorlib_cache"
else:
default_resource_dict["cache_directory"] = cache_directory
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
super().__init__(
executor=FileTaskScheduler(
resource_dict=resource_dict,
pysqa_config_directory=None,
backend=None,
disable_dependencies=disable_dependencies,
execute_function=execute_in_subprocess,
)
)
def foo(x):
time.sleep(3)
return x + 1
with LocalFileExecutor(
cache_directory="not_this_dir",
resource_dict={}
) as exe:
future = exe.submit(
foo,
1,
resource_dict={
"cache_directory": "rather_this_dir",
"cache_key": "foo",
},
)
print(future.result())Metadata
Metadata
Assignees
Labels
No labels