diff --git a/README.md b/README.md index caa4b65b..f432ccfa 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ def calc(i): return i, size, rank with flux.job.FluxExecutor() as flux_exe: - with Executor(max_cores=2, cores_per_worker=2, executor=flux_exe) as exe: + with Executor(max_cores=2, executor=flux_exe, resource_dict={"cores": 2}) as exe: fs = exe.submit(calc, 3) print(fs.result()) ``` diff --git a/executorlib/__init__.py b/executorlib/__init__.py index a61d2ce3..329be6d0 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -36,12 +36,14 @@ class Executor: backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". cache_directory (str, optional): The directory to store cache files. Defaults to "cache". max_cores (int): defines the number cores which can be used in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores_per_worker (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_worker (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and + SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. @@ -89,12 +91,7 @@ def __init__( backend: str = "local", cache_directory: Optional[str] = None, max_cores: int = 1, - cores_per_worker: int = 1, - threads_per_core: int = 1, - gpus_per_worker: int = 0, - cwd: Optional[str] = None, - openmpi_oversubscribe: bool = False, - slurm_cmd_args: list[str] = [], + resource_dict: Optional[dict] = None, flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, @@ -114,12 +111,7 @@ def __new__( backend: str = "local", cache_directory: Optional[str] = None, max_cores: int = 1, - cores_per_worker: int = 1, - threads_per_core: int = 1, - gpus_per_worker: int = 0, - cwd: Optional[str] = None, - openmpi_oversubscribe: bool = False, - slurm_cmd_args: list[str] = [], + resource_dict: Optional[dict] = None, flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, @@ -145,12 +137,15 @@ def __new__( backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". cache_directory (str, optional): The directory to store cache files. Defaults to "cache". max_cores (int): defines the number cores which can be used in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) - cwd (str/None): current working directory where the parallel python task is executed + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores_per_worker (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_worker (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI + and SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM + only) flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. @@ -172,18 +167,26 @@ def __new__( debugging purposes and to get an overview of the specified dependencies. """ + default_resource_dict = { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + } + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) if not disable_dependencies: return ExecutorWithDependencies( max_workers=max_workers, backend=backend, cache_directory=cache_directory, max_cores=max_cores, - cores_per_worker=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_worker=gpus_per_worker, - cwd=cwd, - openmpi_oversubscribe=openmpi_oversubscribe, - slurm_cmd_args=slurm_cmd_args, + resource_dict=resource_dict, flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, @@ -201,12 +204,7 @@ def __new__( backend=backend, cache_directory=cache_directory, max_cores=max_cores, - cores_per_worker=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_worker=gpus_per_worker, - cwd=cwd, - openmpi_oversubscribe=openmpi_oversubscribe, - slurm_cmd_args=slurm_cmd_args, + resource_dict=resource_dict, flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index dd672e2a..8228a234 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -151,12 +151,7 @@ def create_executor( backend: str = "local", max_cores: int = 1, cache_directory: Optional[str] = None, - cores_per_worker: int = 1, - threads_per_core: int = 1, - gpus_per_worker: int = 0, - cwd: Optional[str] = None, - openmpi_oversubscribe: bool = False, - slurm_cmd_args: list[str] = [], + resource_dict: Optional[dict] = None, flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, @@ -179,12 +174,14 @@ def create_executor( backend (str): Switch between the different backends "flux", "local" or "slurm". The default is "local". max_cores (int): defines the number cores which can be used in parallel cache_directory (str, optional): The directory to store cache files. Defaults to "cache". - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores_per_worker (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_worker (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + - openmpi_oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and + SLURM only) - default False + - slurm_cmd_args (list): Additional command line arguments for the srun call (SLURM only) flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. @@ -205,70 +202,69 @@ def create_executor( if flux_executor is not None and backend != "flux": backend = "flux" check_pmi(backend=backend, pmi=flux_executor_pmi_mode) - executor_kwargs = { - "cores": cores_per_worker, - "hostname_localhost": hostname_localhost, - "cwd": cwd, - "cache_directory": cache_directory, - } + cores_per_worker = resource_dict["cores"] + resource_dict["cache_directory"] = cache_directory + resource_dict["hostname_localhost"] = hostname_localhost if backend == "flux": - check_oversubscribe(oversubscribe=openmpi_oversubscribe) - check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args) - executor_kwargs["threads_per_core"] = threads_per_core - executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker) - executor_kwargs["flux_executor"] = flux_executor - executor_kwargs["flux_executor_pmi_mode"] = flux_executor_pmi_mode - executor_kwargs["flux_executor_nesting"] = flux_executor_nesting + check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) + check_command_line_argument_lst( + command_line_argument_lst=resource_dict["slurm_cmd_args"] + ) + del resource_dict["openmpi_oversubscribe"] + del resource_dict["slurm_cmd_args"] + resource_dict["flux_executor"] = flux_executor + resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode + resource_dict["flux_executor_nesting"] = flux_executor_nesting if block_allocation: - executor_kwargs["init_function"] = init_function + resource_dict["init_function"] = init_function return InteractiveExecutor( max_workers=int(max_cores / cores_per_worker), - executor_kwargs=executor_kwargs, + executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, - executor_kwargs=executor_kwargs, + executor_kwargs=resource_dict, spawner=FluxPythonSpawner, ) elif backend == "slurm": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) - executor_kwargs["threads_per_core"] = threads_per_core - executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker) - executor_kwargs["slurm_cmd_args"] = slurm_cmd_args - executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe if block_allocation: - executor_kwargs["init_function"] = init_function + resource_dict["init_function"] = init_function return InteractiveExecutor( max_workers=int(max_cores / cores_per_worker), - executor_kwargs=executor_kwargs, + executor_kwargs=resource_dict, spawner=SrunSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, - executor_kwargs=executor_kwargs, + executor_kwargs=resource_dict, spawner=SrunSpawner, ) else: # backend="local" - check_threads_per_core(threads_per_core=threads_per_core) - check_gpus_per_worker(gpus_per_worker=gpus_per_worker) - check_command_line_argument_lst(command_line_argument_lst=slurm_cmd_args) check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) - executor_kwargs["openmpi_oversubscribe"] = openmpi_oversubscribe + check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) + check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) + check_command_line_argument_lst( + command_line_argument_lst=resource_dict["slurm_cmd_args"] + ) + del resource_dict["threads_per_core"] + del resource_dict["gpus_per_core"] + del resource_dict["slurm_cmd_args"] if block_allocation: - executor_kwargs["init_function"] = init_function + resource_dict["init_function"] = init_function return InteractiveExecutor( max_workers=int(max_cores / cores_per_worker), - executor_kwargs=executor_kwargs, + executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) else: return InteractiveStepExecutor( max_cores=max_cores, - executor_kwargs=executor_kwargs, + executor_kwargs=resource_dict, spawner=MpiExecSpawner, ) diff --git a/notebooks/examples.ipynb b/notebooks/examples.ipynb index c1649fe7..183b2658 100644 --- a/notebooks/examples.ipynb +++ b/notebooks/examples.ipynb @@ -61,10 +61,11 @@ ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, + "id": "2ed59582cab0eb29", + "metadata": {}, + "outputs": [], "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -73,29 +74,29 @@ " with Executor(max_cores=1, flux_executor=flux_exe) as exe:\n", " future = exe.submit(sum, [1, 1])\n", " print(future.result())" - ], - "id": "2ed59582cab0eb29" + ] }, { - "metadata": {}, "cell_type": "markdown", - "source": "The result of the calculation is again `1+1=2`.", - "id": "e1ae417273ebf0f5" + "id": "e1ae417273ebf0f5", + "metadata": {}, + "source": "The result of the calculation is again `1+1=2`." }, { - "metadata": {}, "cell_type": "markdown", + "id": "bcf8a85c015d55da", + "metadata": {}, "source": [ "Beyond pre-defined functions like the `sum()` function, the same functionality can be used to submit user-defined \n", "functions. In the next example a custom summation function is defined:" - ], - "id": "bcf8a85c015d55da" + ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, + "id": "70ff8c30cc13bfd5", + "metadata": {}, + "outputs": [], "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -119,12 +120,12 @@ " fs_4.result(),\n", " ]\n", " )" - ], - "id": "70ff8c30cc13bfd5" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "495e6e17964fe936", + "metadata": {}, "source": [ "In contrast to the previous example where just a single function was submitted to a single worker, in this case a total\n", "of four functions is submitted to a group of two workers `max_cores=2`. Consequently, the functions are executed as a\n", @@ -135,25 +136,25 @@ "the same commands in interactive environments like [Jupyter notebooks](https://jupyter.org). This is achieved by using \n", "[cloudpickle](https://github.com/cloudpipe/cloudpickle) to serialize the python function and its parameters rather than\n", "the regular pickle package." - ], - "id": "495e6e17964fe936" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "7f13ea3733327ff8", + "metadata": {}, "source": [ "For backwards compatibility with the [`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html) \n", "class the [`concurrent.futures.Executor`](https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures)\n", "also implements the `map()` function to map a series of inputs to a function. The same `map()` function is also \n", "available in the `executorlib.Executor`:" - ], - "id": "7f13ea3733327ff8" + ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, + "id": "c320897f8c44f364", + "metadata": {}, + "outputs": [], "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -166,18 +167,18 @@ "with flux.job.FluxExecutor() as flux_exe:\n", " with Executor(max_cores=2, flux_executor=flux_exe) as exe:\n", " print(list(exe.map(calc, [[2, 1], [2, 2], [2, 3], [2, 4]])))" - ], - "id": "c320897f8c44f364" + ] }, { - "metadata": {}, "cell_type": "markdown", - "source": "The results remain the same. ", - "id": "6a22677b67784c97" + "id": "6a22677b67784c97", + "metadata": {}, + "source": "The results remain the same. " }, { - "metadata": {}, "cell_type": "markdown", + "id": "240ad1f5dc0c43c2", + "metadata": {}, "source": [ "## Resource Assignment\n", "By default, every submission of a python function results in a flux job (or SLURM job step) depending on the backend. \n", @@ -189,14 +190,14 @@ "resources have to be configured on the **Executor level**, otherwise it can either be defined on the **Executor level**\n", "or on the **Submission level**. The resource defined on the **Submission level** overwrite the resources defined on the \n", "**Executor level**." - ], - "id": "240ad1f5dc0c43c2" + ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, + "id": "631422e52b7f8b1d", + "metadata": {}, + "outputs": [], "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -212,12 +213,14 @@ " max_workers=2, # total number of cores available to the Executor\n", " backend=\"flux\", # optional in case the backend is not recognized\n", " # Optional resource definition\n", - " cores_per_worker=1,\n", - " threads_per_core=1,\n", - " gpus_per_worker=0,\n", - " cwd=\"/home/jovyan/notebooks\",\n", - " openmpi_oversubscribe=False, # not available with flux\n", - " slurm_cmd_args=[], # additional command line arguments for SLURM\n", + " resource_dict={\n", + " \"cores\": 1,\n", + " \"threads_per_core\": 1,\n", + " \"gpus_per_core\": 0,\n", + " \"cwd\": \"/home/jovyan/notebooks\",\n", + " \"openmpi_oversubscribe\": False,\n", + " \"slurm_cmd_args\": [],\n", + " },\n", " flux_executor=flux_exe,\n", " flux_executor_pmi_mode=None,\n", " flux_executor_nesting=False,\n", @@ -247,12 +250,12 @@ " },\n", " )\n", " print(future_obj.result())" - ], - "id": "631422e52b7f8b1d" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "ab12ff4ebd5efb98", + "metadata": {}, "source": [ "The `max_cores` which defines the total number of cores of the allocation, is the only mandatory parameter. All other\n", "resource parameters are optional. If none of the submitted Python function uses [mpi4py](https://mpi4py.readthedocs.io)\n", @@ -260,14 +263,14 @@ "and `gpus_per_worker=0`. These are defaults, so they do even have to be specified. In this case it also makes sense to \n", "enable `block_allocation=True` to continuously use a fixed number of python processes rather than creating a new python\n", "process for each submission. In this case the above example can be reduced to: " - ], - "id": "ab12ff4ebd5efb98" + ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, + "id": "efe054c93d835e4a", + "metadata": {}, + "outputs": [], "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -290,21 +293,21 @@ " parameter_b=2,\n", " )\n", " print(future_obj.result())" - ], - "id": "efe054c93d835e4a" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "c6983f28b18f831b", + "metadata": {}, "source": [ "The working directory parameter `cwd` can be helpful for tasks which interact with the file system to define which task\n", "is executed in which folder, but for most python functions it is not required." - ], - "id": "c6983f28b18f831b" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "3bf7af3ce2388f75", + "metadata": {}, "source": [ "## Data Handling\n", "A limitation of many parallel approaches is the overhead in communication when working with large datasets. Instead of\n", @@ -314,14 +317,14 @@ "keys of the dictionary can then be used as additional input parameters in each function submitted to the `executorlib.Executor`. When block allocation is disabled this functionality is not available, as each function is executed in a separate process, so no data can be preloaded.\n", "\n", "This functionality is illustrated below: " - ], - "id": "3bf7af3ce2388f75" + ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, + "id": "74552573e3e3d3d9", + "metadata": {}, + "outputs": [], "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -344,12 +347,12 @@ " ) as exe:\n", " fs = exe.submit(calc, 2, j=5)\n", " print(fs.result())" - ], - "id": "74552573e3e3d3d9" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "c71bc876a65349cf", + "metadata": {}, "source": [ "The function `calc()` requires three inputs `i`, `j` and `k`. But when the function is submitted to the executor only \n", "two inputs are provided `fs = exe.submit(calc, 2, j=5)`. In this case the first input parameter is mapped to `i=2`, the\n", @@ -361,21 +364,21 @@ "\n", "The result is `2+5+3=10` as `i=2` and `j=5` are provided during the submission and `k=3` is defined in the `init_function()`\n", "function." - ], - "id": "c71bc876a65349cf" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "a4d4d5447e68a834", + "metadata": {}, "source": [ "## Up-Scaling \n", "[flux](https://flux-framework.org) provides fine-grained resource assigment via `libhwloc` and `pmi`." - ], - "id": "a4d4d5447e68a834" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "ad6fec651dfbc263", + "metadata": {}, "source": [ "### Thread-based Parallelism\n", "The number of threads per core can be controlled with the `threads_per_core` parameter during the initialization of the \n", @@ -390,14 +393,14 @@ "\n", "At the current stage `executorlib.Executor` does not set these parameters itself, so you have to add them in the function\n", "you submit before importing the corresponding library: \n" - ], - "id": "ad6fec651dfbc263" + ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, + "id": "1fbcc6242f13973b", + "metadata": {}, + "outputs": [], "source": [ "def calc(i):\n", " import os\n", @@ -410,12 +413,12 @@ " import numpy as np\n", "\n", " return i" - ], - "id": "1fbcc6242f13973b" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "aadd8aa9902d854e", + "metadata": {}, "source": [ "Most modern CPUs use hyper-threading to present the operating system with double the number of virtual cores compared to\n", "the number of physical cores available. So unless this functionality is disabled `threads_per_core=2` is a reasonable \n", @@ -425,12 +428,12 @@ "\n", "Specific manycore CPU models like the Intel Xeon Phi processors provide a much higher hyper-threading ration and require\n", "a higher number of threads per core for optimal performance. \n" - ], - "id": "aadd8aa9902d854e" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "d19861a257e40fc3", + "metadata": {}, "source": [ "### MPI Parallel Python Functions\n", "Beyond thread based parallelism, the message passing interface (MPI) is the de facto standard parallel execution in \n", @@ -446,14 +449,14 @@ "advantage of this approach is that the users can parallelize their workflows one function at the time. \n", "\n", "The example in `test_mpi.py` illustrates the submission of a simple MPI parallel python function: " - ], - "id": "d19861a257e40fc3" + ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, + "id": "e00d8448d882dfd5", + "metadata": {}, + "outputs": [], "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -470,18 +473,18 @@ "with flux.job.FluxExecutor() as flux_exe:\n", " with Executor(\n", " max_cores=2,\n", - " cores_per_worker=2,\n", + " resource_dict={\"cores\": 2},\n", " flux_executor=flux_exe,\n", " flux_executor_pmi_mode=\"pmix\",\n", " ) as exe:\n", " fs = exe.submit(calc, 3)\n", " print(fs.result())" - ], - "id": "e00d8448d882dfd5" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "35c49013c2de3907", + "metadata": {}, "source": [ "In the example environment OpenMPI version 5 is used, so the `pmi` parameter has to be set to `pmix` rather than `pmi1` or `pmi2` which is the default. For `mpich` it is not necessary to specify the `pmi` interface manually.\n", "The `calc()` function initializes the [`mpi4py`](https://mpi4py.readthedocs.io) library and gathers the size of the \n", @@ -495,23 +498,23 @@ "The response consists of a list of two tuples, one for each MPI parallel process, with the first entry of the tuple \n", "being the parameter `i=3`, followed by the number of MPI parallel processes assigned to the function call `cores_per_worker=2`\n", "and finally the index of the specific process `0` or `1`. " - ], - "id": "35c49013c2de3907" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "6960ccc01268e1f7", + "metadata": {}, "source": [ "### GPU Assignment\n", "With the rise of machine learning applications, the use of GPUs for scientific application becomes more and more popular.\n", "Consequently, it is essential to have full control over the assignment of GPUs to specific python functions. In the \n", "`test_gpu.py` example the `tensorflow` library is used to identify the GPUs and return their configuration: " - ], - "id": "6960ccc01268e1f7" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "db3727c5da7072cd", + "metadata": {}, "source": [ "```\n", "import socket\n", @@ -536,12 +539,12 @@ " fs_2 = exe.submit(get_available_gpus)\n", " print(fs_1.result(), fs_2.result())\n", "```" - ], - "id": "db3727c5da7072cd" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "e7ccb6c390b33c73", + "metadata": {}, "source": [ "The additional parameter `gpus_per_worker=1` specifies that one GPU is assigned to each worker. This functionality \n", "requires `executorlib` to be connected to a resource manager like the [SLURM workload manager](https://www.schedmd.com)\n", @@ -550,12 +553,12 @@ "\n", "To clarify the execution of such an example on a high performance computing (HPC) cluster using the [SLURM workload manager](https://www.schedmd.com)\n", "the submission script is given below: " - ], - "id": "e7ccb6c390b33c73" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "8aa7df69d42b5b74", + "metadata": {}, "source": [ "```\n", "#!/bin/bash\n", @@ -565,12 +568,12 @@ "\n", "python test_gpu.py\n", "```" - ], - "id": "8aa7df69d42b5b74" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "8a6636284ba16750", + "metadata": {}, "source": [ "The important part is that for using the `executorlib.slurm.PySlurmExecutor` backend the script `test_gpu.py` does not\n", "need to be executed with `srun` but rather it is sufficient to just execute it with the python interpreter. `executorlib`\n", @@ -580,12 +583,12 @@ "within the [SLURM workload manager](https://www.schedmd.com) it is essential that the resources are passed from the \n", "[SLURM workload manager](https://www.schedmd.com) to the [flux framework](https://flux-framework.org). This is achieved\n", "by calling `srun flux start` in the submission script: " - ], - "id": "8a6636284ba16750" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "888454c1532ad432", + "metadata": {}, "source": [ "```\n", "#!/bin/bash\n", @@ -595,12 +598,12 @@ "\n", "srun flux start python test_gpu.py\n", "````" - ], - "id": "888454c1532ad432" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "d1285038563eee32", + "metadata": {}, "source": [ "As a result the GPUs available on the two compute nodes are reported: \n", "```\n", @@ -608,12 +611,12 @@ ">>> ('/device:GPU:0', 'device: 0, name: Tesla V100S-PCIE-32GB, pci bus id: 0000:84:00.0, compute capability: 7.0', 'cn139')]\n", "```\n", "In this case each compute node `cn138` and `cn139` is equipped with one `Tesla V100S-PCIE-32GB`.\n" - ], - "id": "d1285038563eee32" + ] }, { - "metadata": {}, "cell_type": "markdown", + "id": "df3ff4f3c9ee10b8", + "metadata": {}, "source": [ "## Coupled Functions \n", "For submitting two functions with rather different computing resource requirements it is essential to represent this \n", @@ -622,14 +625,14 @@ "input for the second function during the submission. Consequently, this functionality can be used for directed acyclic \n", "graphs, still it does not enable cyclic graphs. As a simple example we can add one to the result of the addition of one\n", "and two:" - ], - "id": "df3ff4f3c9ee10b8" + ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, + "id": "1dbc77aadc5b6ed0", + "metadata": {}, + "outputs": [], "source": [ "import flux.job\n", "from executorlib import Executor\n", @@ -654,8 +657,7 @@ " resource_dict={\"cores\": 1},\n", " )\n", " print(future_2.result())" - ], - "id": "1dbc77aadc5b6ed0" + ] }, { "cell_type": "markdown", @@ -753,12 +755,12 @@ ] }, { - "metadata": {}, "cell_type": "code", - "outputs": [], "execution_count": null, - "source": "", - "id": "601852447d3839c4" + "id": "601852447d3839c4", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/tests/test_dependencies_executor.py b/tests/test_dependencies_executor.py index 44a47d8e..cfcf0733 100644 --- a/tests/test_dependencies_executor.py +++ b/tests/test_dependencies_executor.py @@ -99,10 +99,14 @@ def test_dependency_steps(self): executor = create_executor( max_workers=1, max_cores=2, - cores_per_worker=1, - threads_per_core=1, - gpus_per_worker=0, - openmpi_oversubscribe=False, + resource_dict={ + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, backend="local", ) process = RaisingThread( diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index d0f8d41f..6fcf3575 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -60,7 +60,7 @@ def test_flux_executor_serial(self): def test_flux_executor_threads(self): with Executor( max_cores=1, - threads_per_core=2, + resource_dict={"threads_per_core": 2}, flux_executor=self.executor, backend="flux", block_allocation=True, @@ -75,7 +75,7 @@ def test_flux_executor_threads(self): def test_flux_executor_parallel(self): with Executor( max_cores=2, - cores_per_worker=2, + resource_dict={"cores": 2}, flux_executor=self.executor, backend="flux", block_allocation=True, @@ -88,7 +88,7 @@ def test_flux_executor_parallel(self): def test_single_task(self): with Executor( max_cores=2, - cores_per_worker=2, + resource_dict={"cores": 2}, flux_executor=self.executor, backend="flux", block_allocation=True, @@ -103,7 +103,7 @@ def test_single_task(self): def test_internal_memory(self): with Executor( max_cores=1, - cores_per_worker=1, + resource_dict={"cores": 1}, init_function=set_global, flux_executor=self.executor, backend="flux", diff --git a/tests/test_executor_backend_mpi.py b/tests/test_executor_backend_mpi.py index a19918f9..9a4309b2 100644 --- a/tests/test_executor_backend_mpi.py +++ b/tests/test_executor_backend_mpi.py @@ -58,7 +58,7 @@ def test_meta_executor_single(self): def test_meta_executor_parallel(self): with Executor( max_workers=2, - cores_per_worker=2, + resource_dict={"cores": 2}, backend="local", block_allocation=True, ) as exe: @@ -71,15 +71,13 @@ def test_errors(self): with self.assertRaises(TypeError): Executor( max_cores=1, - cores_per_worker=1, - threads_per_core=2, + resource_dict={"cores": 1, "threads_per_core": 2}, backend="local", ) with self.assertRaises(TypeError): Executor( max_cores=1, - cores_per_worker=1, - gpus_per_worker=1, + resource_dict={"cores": 1, "gpus_per_core": 1}, backend="local", ) @@ -94,7 +92,7 @@ def tearDown(self): def test_meta_executor_parallel_cache(self): with Executor( max_workers=2, - cores_per_worker=2, + resource_dict={"cores": 2}, backend="local", block_allocation=True, cache_directory="./cache", diff --git a/tests/test_executor_backend_mpi_noblock.py b/tests/test_executor_backend_mpi_noblock.py index b7f8a19a..06be8a4d 100644 --- a/tests/test_executor_backend_mpi_noblock.py +++ b/tests/test_executor_backend_mpi_noblock.py @@ -45,15 +45,19 @@ def test_errors(self): with self.assertRaises(TypeError): Executor( max_cores=1, - cores_per_worker=1, - threads_per_core=2, + resource_dict={ + "cores": 1, + "threads_per_core": 2, + }, backend="local", ) with self.assertRaises(TypeError): Executor( max_cores=1, - cores_per_worker=1, - gpus_per_worker=1, + resource_dict={ + "cores": 1, + "gpus_per_core": 1, + }, backend="local", ) with self.assertRaises(ValueError):