Skip to content

Commit e4aab01

Browse files
committed
Add hardware pinning to dragon run requests
1 parent 52abd32 commit e4aab01

15 files changed

+1825
-25
lines changed

doc/changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Jump to:
1313

1414
Description
1515

16+
- Add hardware pinning capability when using dragon
1617
- Add EnvironmentConfigLoader for ML Worker Manager
1718
- Add Model schema with model metadata included
1819
- Removed device from schemas, MessageHandler and tests

doc/dragon.rst

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,51 @@ In the next sections, we detail how Dragon is integrated into SmartSim.
6565

6666
For more information on HPC launchers, visit the :ref:`Run Settings<run_settings_hpc_ex>` page.
6767

68+
Hardware Pinning
69+
================
70+
71+
Dragon also enables users to specify hardware constraints using ``DragonRunSettings``. For
72+
example, you may may configure the run settings to require that nodes executing the
73+
``Model`` support the `"gpu"` feature.
74+
75+
.. code-block:: python
76+
77+
# Because "dragon" was specified as the launcher during Experiment initialization,
78+
# create_run_settings will return a DragonRunSettings object
79+
rs = exp.create_run_settings(exe="mpi_app",
80+
exe_args=["--option", "value"],
81+
env_vars={"MYVAR": "VALUE"})
82+
83+
# Specify that the nodes features must include a GPU
84+
rs.set_feature("gpu")
85+
86+
For more fine-grained control, CPU and GPU affinity can be specified using the
87+
``DragonRunSettings`` object. The following example demonstrates how to specify
88+
CPU affinity and GPU affinities simultaneously. Note that affinities are passed
89+
as a list of device indices.
90+
91+
.. code-block:: python
92+
93+
# Because "dragon" was specified as the launcher during Experiment initialization,
94+
# create_run_settings will return a DragonRunSettings object
95+
rs = exp.create_run_settings(exe="mpi_app",
96+
exe_args=["--option", "value"],
97+
env_vars={"MYVAR": "VALUE"})
98+
99+
# Request the first 8 CPUs for this job
100+
rs.set_cpu_affinity(list(range(9)))
101+
102+
# Request the first two GPUs on the node for this job
103+
rs.set_gpu_affinity([0, 1])
104+
105+
.. note::
106+
107+
SmartSim submits jobs in the order they are received. On a heterogeneous system, SmartSim
108+
will attempt to allocate non-GPU nodes first. However, a process may be allocated to a GPU
109+
node if only GPU nodes are available, regardless of the requested features.
110+
111+
To ensure a process is allocated to a specific node, configure a hostname constraint.
112+
68113
=================
69114
The Dragon Server
70115
=================

smartsim/_core/launcher/dragon/dragonBackend.py

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -211,9 +211,12 @@ def group_infos(self) -> dict[str, ProcessGroupInfo]:
211211
def _initialize_hosts(self) -> None:
212212
with self._queue_lock:
213213
self._hosts: t.List[str] = sorted(
214-
dragon_machine.Node(node).hostname
215-
for node in dragon_machine.System().nodes
214+
node for node in dragon_machine.System().nodes
216215
)
216+
self._nodes = [dragon_machine.Node(node) for node in self._hosts]
217+
self._cpus = [node.num_cpus for node in self._nodes]
218+
self._gpus = [node.num_gpus for node in self._nodes]
219+
217220
"""List of hosts available in allocation"""
218221
self._free_hosts: t.Deque[str] = collections.deque(self._hosts)
219222
"""List of hosts on which steps can be launched"""
@@ -285,6 +288,34 @@ def current_time(self) -> float:
285288
"""Current time for DragonBackend object, in seconds since the Epoch"""
286289
return time.time()
287290

291+
def _can_honor_policy(
292+
self, request: DragonRunRequest
293+
) -> t.Tuple[bool, t.Optional[str]]:
294+
# ensure the policy can be honored
295+
if request.policy:
296+
if request.policy.device == "gpu":
297+
# make sure nodes w/GPUs exist
298+
if not any(self._gpus):
299+
return False, "Cannot satisfy request, no GPUs available"
300+
301+
if request.policy.cpu_affinity:
302+
# make sure some node has enough CPUs
303+
available = max(self._cpus)
304+
requested = max(request.policy.cpu_affinity)
305+
306+
if requested >= available:
307+
return False, "Cannot satisfy request, not enough CPUs available"
308+
309+
if request.policy.gpu_affinity:
310+
# make sure some node has enough GPUs
311+
available = max(self._gpus)
312+
requested = max(request.policy.gpu_affinity)
313+
314+
if requested >= available:
315+
return False, "Cannot satisfy request, not enough GPUs available"
316+
317+
return True, None
318+
288319
def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]:
289320
"""Check if request can be honored with resources available in the allocation.
290321
@@ -299,6 +330,11 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]
299330
if self._shutdown_requested:
300331
message = "Cannot satisfy request, server is shutting down."
301332
return False, message
333+
334+
honorable, err = self._can_honor_policy(request)
335+
if not honorable:
336+
return False, err
337+
302338
return True, None
303339

304340
def _allocate_step(
@@ -391,6 +427,44 @@ def _stop_steps(self) -> None:
391427
self._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED
392428
self._group_infos[step_id].return_codes = [-9]
393429

430+
@staticmethod
431+
def create_run_policy(
432+
request: DragonRunRequest, node_name: str
433+
) -> "dragon_policy.Policy":
434+
if isinstance(request, DragonRunRequest):
435+
run_request: DragonRunRequest = request
436+
437+
device = dragon_policy.Policy.Device.DEFAULT
438+
affinity = dragon_policy.Policy.Affinity.DEFAULT
439+
cpu_affinity: t.List[int] = []
440+
gpu_affinity: t.List[int] = []
441+
442+
if run_request.policy is not None:
443+
if run_request.policy.cpu_affinity:
444+
affinity = dragon_policy.Policy.Affinity.SPECIFIC
445+
cpu_affinity = run_request.policy.cpu_affinity
446+
device = dragon_policy.Policy.Device.CPU
447+
448+
if run_request.policy.gpu_affinity:
449+
affinity = dragon_policy.Policy.Affinity.SPECIFIC
450+
gpu_affinity = run_request.policy.gpu_affinity
451+
device = dragon_policy.Policy.Device.GPU
452+
453+
if affinity != dragon_policy.Policy.Affinity.DEFAULT:
454+
return dragon_policy.Policy(
455+
placement=dragon_policy.Policy.Placement.HOST_NAME,
456+
host_name=node_name,
457+
affinity=affinity,
458+
device=device,
459+
cpu_affinity=cpu_affinity,
460+
gpu_affinity=gpu_affinity,
461+
)
462+
463+
return dragon_policy.Policy(
464+
placement=dragon_policy.Policy.Placement.HOST_NAME,
465+
host_name=node_name,
466+
)
467+
394468
def _start_steps(self) -> None:
395469
self._heartbeat()
396470
with self._queue_lock:
@@ -412,10 +486,7 @@ def _start_steps(self) -> None:
412486

413487
policies = []
414488
for node_name in hosts:
415-
local_policy = dragon_policy.Policy(
416-
placement=dragon_policy.Policy.Placement.HOST_NAME,
417-
host_name=node_name,
418-
)
489+
local_policy = self.create_run_policy(request, node_name)
419490
policies.extend([local_policy] * request.tasks_per_node)
420491
tmp_proc = dragon_process.ProcessTemplate(
421492
target=request.exe,

smartsim/_core/launcher/dragon/dragonLauncher.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import os
3030
import typing as t
3131

32+
from smartsim._core.schemas.dragonRequests import DragonRunPolicy
33+
3234
from ...._core.launcher.stepMapping import StepMap
3335
from ....error import LauncherError, SmartSimError
3436
from ....log import get_logger
@@ -168,6 +170,9 @@ def run(self, step: Step) -> t.Optional[str]:
168170
merged_env = self._connector.merge_persisted_env(os.environ.copy())
169171
nodes = int(run_args.get("nodes", None) or 1)
170172
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)
173+
174+
policy = DragonRunPolicy.from_run_args(run_args)
175+
171176
response = _assert_schema_type(
172177
self._connector.send_request(
173178
DragonRunRequest(
@@ -181,6 +186,7 @@ def run(self, step: Step) -> t.Optional[str]:
181186
current_env=merged_env,
182187
output_file=out,
183188
error_file=err,
189+
policy=policy,
184190
)
185191
),
186192
DragonRunResponse,

smartsim/_core/launcher/step/dragonStep.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030
import sys
3131
import typing as t
3232

33-
from ...._core.schemas.dragonRequests import DragonRunRequest, request_registry
33+
from ...._core.schemas.dragonRequests import (
34+
DragonRunPolicy,
35+
DragonRunRequest,
36+
request_registry,
37+
)
3438
from ....error.errors import SSUnsupportedError
3539
from ....log import get_logger
3640
from ....settings import (
@@ -166,8 +170,11 @@ def _write_request_file(self) -> str:
166170
nodes = int(run_args.get("nodes", None) or 1)
167171
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)
168172

173+
policy = DragonRunPolicy.from_run_args(run_args)
174+
169175
cmd = step.get_launch_cmd()
170176
out, err = step.get_output_files()
177+
171178
request = DragonRunRequest(
172179
exe=cmd[0],
173180
exe_args=cmd[1:],
@@ -179,6 +186,7 @@ def _write_request_file(self) -> str:
179186
current_env=os.environ,
180187
output_file=out,
181188
error_file=err,
189+
policy=policy,
182190
)
183191
requests.append(request_registry.to_string(request))
184192
with open(request_file, "w", encoding="utf-8") as script_file:

smartsim/_core/launcher/step/step.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from __future__ import annotations
2828

29+
import copy
2930
import functools
3031
import os.path as osp
3132
import pathlib
@@ -51,7 +52,7 @@ def __init__(self, name: str, cwd: str, step_settings: SettingsBase) -> None:
5152
self.entity_name = name
5253
self.cwd = cwd
5354
self.managed = False
54-
self.step_settings = step_settings
55+
self.step_settings = copy.deepcopy(step_settings)
5556
self.meta: t.Dict[str, str] = {}
5657

5758
@property

smartsim/_core/schemas/dragonRequests.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
import typing as t
2828

29-
from pydantic import BaseModel, Field, PositiveInt
29+
from pydantic import BaseModel, Field, NonNegativeInt, PositiveInt
3030

3131
import smartsim._core.schemas.utils as _utils
3232

@@ -39,6 +39,36 @@
3939
class DragonRequest(BaseModel): ...
4040

4141

42+
class DragonRunPolicy(BaseModel):
43+
"""Policy specifying hardware constraints when running a Dragon job"""
44+
45+
device: t.Literal["cpu", "gpu"] = Field(default="cpu")
46+
cpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
47+
gpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
48+
49+
@staticmethod
50+
def from_run_args(
51+
run_args: t.Dict[str, t.Union[int, str, float, None]]
52+
) -> "DragonRunPolicy":
53+
features: str = str(run_args.get("node-feature", ""))
54+
55+
device = "gpu" if "gpu" in features else "cpu"
56+
57+
gpu_args = str(run_args.get("gpu-affinity", ""))
58+
cpu_args = str(run_args.get("cpu-affinity", ""))
59+
gpu_affinity = [x for x in gpu_args.split(",") if x]
60+
cpu_affinity = [x for x in cpu_args.split(",") if x]
61+
62+
if device == "cpu" and not (cpu_affinity or gpu_affinity):
63+
return DragonRunPolicy()
64+
65+
return DragonRunPolicy(
66+
device=device,
67+
cpu_affinity=cpu_affinity,
68+
gpu_affinity=gpu_affinity,
69+
)
70+
71+
4272
class DragonRunRequestView(DragonRequest):
4373
exe: t.Annotated[str, Field(min_length=1)]
4474
exe_args: t.List[t.Annotated[str, Field(min_length=1)]] = []
@@ -57,6 +87,7 @@ class DragonRunRequestView(DragonRequest):
5787
@request_registry.register("run")
5888
class DragonRunRequest(DragonRunRequestView):
5989
current_env: t.Dict[str, t.Optional[str]] = {}
90+
policy: t.Optional[DragonRunPolicy] = None
6091

6192
def __str__(self) -> str:
6293
return str(DragonRunRequestView.parse_obj(self.dict(exclude={"current_env"})))

smartsim/settings/dragonRunSettings.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929
import typing as t
3030

31+
from typing_extensions import override
32+
3133
from ..log import get_logger
3234
from .base import RunSettings
3335

@@ -63,16 +65,45 @@ def __init__(
6365
**kwargs,
6466
)
6567

68+
@override
6669
def set_nodes(self, nodes: int) -> None:
6770
"""Set the number of nodes
6871
6972
:param nodes: number of nodes to run with
7073
"""
7174
self.run_args["nodes"] = nodes
7275

76+
@override
7377
def set_tasks_per_node(self, tasks_per_node: int) -> None:
7478
"""Set the number of tasks for this job
7579
7680
:param tasks_per_node: number of tasks per node
7781
"""
7882
self.run_args["tasks-per-node"] = tasks_per_node
83+
84+
@override
85+
def set_node_feature(self, feature_list: t.Union[str, t.List[str]]) -> None:
86+
"""Add a node feature requirement
87+
88+
:param tasks_per_node: number of tasks per node
89+
"""
90+
if isinstance(feature_list, str):
91+
feature_list = feature_list.strip().split()
92+
elif not all(isinstance(feature, str) for feature in feature_list):
93+
raise TypeError("feature_list must be string or list of strings")
94+
95+
self.run_args["node-feature"] = ",".join(feature_list)
96+
97+
def set_cpu_affinity(self, devices: t.List[int]) -> None:
98+
"""Set the CPU affinity for this job
99+
100+
:param devices: list of CPU indices to execute on
101+
"""
102+
self.run_args["cpu-affinity"] = ",".join(str(device) for device in devices)
103+
104+
def set_gpu_affinity(self, devices: t.List[int]) -> None:
105+
"""Set the GPU affinity for this job
106+
107+
:param devices: list of GPU indices to execute on.
108+
"""
109+
self.run_args["gpu-affinity"] = ",".join(str(device) for device in devices)

0 commit comments

Comments
 (0)