Skip to content

Commit b6c2f2b

Browse files
authored
Merge latest develop into mli-feature (#640)
[ committed by @ankona ] [ approved by @AlyssaCote ]
1 parent 0030a4a commit b6c2f2b

File tree

16 files changed

+1826
-25
lines changed

16 files changed

+1826
-25
lines changed

doc/changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ To be released at some future point in time
2828

2929
Description
3030

31+
- Add hardware pinning capability when using dragon
3132
- Pin NumPy version to 1.x
3233
- New launcher support for SGE (and similar derivatives)
3334
- Fix test outputs being created in incorrect directory

doc/dragon.rst

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,34 @@ In the next sections, we detail how Dragon is integrated into SmartSim.
6565

6666
For more information on HPC launchers, visit the :ref:`Run Settings<run_settings_hpc_ex>` page.
6767

68+
Hardware Pinning
69+
================
70+
71+
Dragon also enables users to specify hardware constraints using ``DragonRunSettings``. CPU
72+
and GPU affinity can be specified using the ``DragonRunSettings`` object. The following
73+
example demonstrates how to specify CPU affinity and GPU affinities simultaneously. Note
74+
that affinities are passed as a list of device indices.
75+
76+
.. code-block:: python
77+
78+
# Because "dragon" was specified as the launcher during Experiment initialization,
79+
# create_run_settings will return a DragonRunSettings object
80+
rs = exp.create_run_settings(exe="mpi_app",
81+
exe_args=["--option", "value"],
82+
env_vars={"MYVAR": "VALUE"})
83+
84+
# Request the first 8 CPUs for this job
85+
rs.set_cpu_affinity(list(range(9)))
86+
87+
# Request the first two GPUs on the node for this job
88+
rs.set_gpu_affinity([0, 1])
89+
90+
.. note::
91+
92+
SmartSim launches jobs in the order they are received on the first available
93+
host in a round-robin pattern. To ensure a process is launched on a node with
94+
specific features, configure a hostname constraint.
95+
6896
=================
6997
The Dragon Server
7098
=================

doc/tutorials/online_analysis/lattice/online_analysis.ipynb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@
378378
},
379379
{
380380
"cell_type": "code",
381+
"id": "6f3ed63d-e324-443d-9b68-b2cf618d31c7",
381382
"execution_count": 7,
382383
"metadata": {},
383384
"outputs": [
@@ -399,13 +400,15 @@
399400
},
400401
{
401402
"cell_type": "markdown",
403+
"id": "96c154fe-5ca8-4d89-91f8-8fd4e75cb80e",
402404
"metadata": {},
403405
"source": [
404406
"We then apply the function `probe_points` to the `ux` and `uy` tensors computed in the last time step of the previous simulation. Note that all tensors are already on the DB, thus we can reference them by name. Finally, we download and plot the output (a 2D velocity field), which is stored as `probe_u` on the DB."
405407
]
406408
},
407409
{
408410
"cell_type": "code",
411+
"id": "36e3b415-dcc1-4d25-9cce-52388146a4bb",
409412
"execution_count": 8,
410413
"metadata": {},
411414
"outputs": [
@@ -432,6 +435,7 @@
432435
},
433436
{
434437
"cell_type": "markdown",
438+
"id": "9d7e4966-a0de-480c-9556-936197a5a5d2",
435439
"metadata": {},
436440
"source": [
437441
"### Uploading a function inline\n",
@@ -453,6 +457,7 @@
453457
},
454458
{
455459
"cell_type": "markdown",
460+
"id": "1c4daf43-34d0-482a-b9b5-b3b6f1e173c4",
456461
"metadata": {},
457462
"source": [
458463
"We then store the function on the DB under the key `norm_function`."
@@ -470,6 +475,7 @@
470475
},
471476
{
472477
"cell_type": "markdown",
478+
"id": "19409ac6-e118-44db-a847-2d905fdf0331",
473479
"metadata": {},
474480
"source": [
475481
"Note that the key we used identifies a functional unit containing the function itself: this is similar to the key used to store the `probe` script above. When we want to run the function, we just call it with `run_script`, by indicating the `script` key as `\"norm_function\"` and the name of the function itself as `\"compute_norm\"`."

smartsim/_core/launcher/dragon/dragonBackend.py

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,12 @@ def group_infos(self) -> dict[str, ProcessGroupInfo]:
214214
def _initialize_hosts(self) -> None:
215215
with self._queue_lock:
216216
self._hosts: t.List[str] = sorted(
217-
dragon_machine.Node(node).hostname
218-
for node in dragon_machine.System().nodes
217+
node for node in dragon_machine.System().nodes
219218
)
219+
self._nodes = [dragon_machine.Node(node) for node in self._hosts]
220+
self._cpus = [node.num_cpus for node in self._nodes]
221+
self._gpus = [node.num_gpus for node in self._nodes]
222+
220223
"""List of hosts available in allocation"""
221224
self._free_hosts: t.Deque[str] = collections.deque(self._hosts)
222225
"""List of hosts on which steps can be launched"""
@@ -288,6 +291,34 @@ def current_time(self) -> float:
288291
"""Current time for DragonBackend object, in seconds since the Epoch"""
289292
return time.time()
290293

294+
def _can_honor_policy(
295+
self, request: DragonRunRequest
296+
) -> t.Tuple[bool, t.Optional[str]]:
297+
"""Check if the policy can be honored with resources available
298+
in the allocation.
299+
:param request: DragonRunRequest containing policy information
300+
:returns: Tuple indicating if the policy can be honored and
301+
an optional error message"""
302+
# ensure the policy can be honored
303+
if request.policy:
304+
if request.policy.cpu_affinity:
305+
# make sure some node has enough CPUs
306+
available = max(self._cpus)
307+
requested = max(request.policy.cpu_affinity)
308+
309+
if requested >= available:
310+
return False, "Cannot satisfy request, not enough CPUs available"
311+
312+
if request.policy.gpu_affinity:
313+
# make sure some node has enough GPUs
314+
available = max(self._gpus)
315+
requested = max(request.policy.gpu_affinity)
316+
317+
if requested >= available:
318+
return False, "Cannot satisfy request, not enough GPUs available"
319+
320+
return True, None
321+
291322
def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]:
292323
"""Check if request can be honored with resources available in the allocation.
293324
@@ -302,6 +333,11 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]
302333
if self._shutdown_requested:
303334
message = "Cannot satisfy request, server is shutting down."
304335
return False, message
336+
337+
honorable, err = self._can_honor_policy(request)
338+
if not honorable:
339+
return False, err
340+
305341
return True, None
306342

307343
def _allocate_step(
@@ -410,6 +446,46 @@ def infra_ddict(self) -> str:
410446

411447
return str(self._infra_ddict.serialize())
412448

449+
@staticmethod
450+
def create_run_policy(
451+
request: DragonRequest, node_name: str
452+
) -> "dragon_policy.Policy":
453+
"""Create a dragon Policy from the request and node name
454+
:param request: DragonRunRequest containing policy information
455+
:param node_name: Name of the node on which the process will run
456+
:returns: dragon_policy.Policy object mapped from request properties"""
457+
if isinstance(request, DragonRunRequest):
458+
run_request: DragonRunRequest = request
459+
460+
affinity = dragon_policy.Policy.Affinity.DEFAULT
461+
cpu_affinity: t.List[int] = []
462+
gpu_affinity: t.List[int] = []
463+
464+
# Customize policy only if the client requested it, otherwise use default
465+
if run_request.policy is not None:
466+
# Affinities are not mutually exclusive. If specified, both are used
467+
if run_request.policy.cpu_affinity:
468+
affinity = dragon_policy.Policy.Affinity.SPECIFIC
469+
cpu_affinity = run_request.policy.cpu_affinity
470+
471+
if run_request.policy.gpu_affinity:
472+
affinity = dragon_policy.Policy.Affinity.SPECIFIC
473+
gpu_affinity = run_request.policy.gpu_affinity
474+
475+
if affinity != dragon_policy.Policy.Affinity.DEFAULT:
476+
return dragon_policy.Policy(
477+
placement=dragon_policy.Policy.Placement.HOST_NAME,
478+
host_name=node_name,
479+
affinity=affinity,
480+
cpu_affinity=cpu_affinity,
481+
gpu_affinity=gpu_affinity,
482+
)
483+
484+
return dragon_policy.Policy(
485+
placement=dragon_policy.Policy.Placement.HOST_NAME,
486+
host_name=node_name,
487+
)
488+
413489
def _start_steps(self) -> None:
414490
self._heartbeat()
415491
with self._queue_lock:
@@ -432,10 +508,7 @@ def _start_steps(self) -> None:
432508

433509
policies = []
434510
for node_name in hosts:
435-
local_policy = dragon_policy.Policy(
436-
placement=dragon_policy.Policy.Placement.HOST_NAME,
437-
host_name=node_name,
438-
)
511+
local_policy = self.create_run_policy(request, node_name)
439512
policies.extend([local_policy] * request.tasks_per_node)
440513
tmp_proc = dragon_process.ProcessTemplate(
441514
target=request.exe,

smartsim/_core/launcher/dragon/dragonLauncher.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import os
3030
import typing as t
3131

32+
from smartsim._core.schemas.dragonRequests import DragonRunPolicy
33+
3234
from ...._core.launcher.stepMapping import StepMap
3335
from ....error import LauncherError, SmartSimError
3436
from ....log import get_logger
@@ -168,6 +170,9 @@ def run(self, step: Step) -> t.Optional[str]:
168170
merged_env = self._connector.merge_persisted_env(os.environ.copy())
169171
nodes = int(run_args.get("nodes", None) or 1)
170172
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)
173+
174+
policy = DragonRunPolicy.from_run_args(run_args)
175+
171176
response = _assert_schema_type(
172177
self._connector.send_request(
173178
DragonRunRequest(
@@ -181,6 +186,7 @@ def run(self, step: Step) -> t.Optional[str]:
181186
current_env=merged_env,
182187
output_file=out,
183188
error_file=err,
189+
policy=policy,
184190
)
185191
),
186192
DragonRunResponse,

smartsim/_core/launcher/step/dragonStep.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030
import sys
3131
import typing as t
3232

33-
from ...._core.schemas.dragonRequests import DragonRunRequest, request_registry
33+
from ...._core.schemas.dragonRequests import (
34+
DragonRunPolicy,
35+
DragonRunRequest,
36+
request_registry,
37+
)
3438
from ....error.errors import SSUnsupportedError
3539
from ....log import get_logger
3640
from ....settings import (
@@ -166,8 +170,11 @@ def _write_request_file(self) -> str:
166170
nodes = int(run_args.get("nodes", None) or 1)
167171
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)
168172

173+
policy = DragonRunPolicy.from_run_args(run_args)
174+
169175
cmd = step.get_launch_cmd()
170176
out, err = step.get_output_files()
177+
171178
request = DragonRunRequest(
172179
exe=cmd[0],
173180
exe_args=cmd[1:],
@@ -179,6 +186,7 @@ def _write_request_file(self) -> str:
179186
current_env=os.environ,
180187
output_file=out,
181188
error_file=err,
189+
policy=policy,
182190
)
183191
requests.append(request_registry.to_string(request))
184192
with open(request_file, "w", encoding="utf-8") as script_file:

smartsim/_core/launcher/step/step.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from __future__ import annotations
2828

29+
import copy
2930
import functools
3031
import os.path as osp
3132
import pathlib
@@ -51,7 +52,7 @@ def __init__(self, name: str, cwd: str, step_settings: SettingsBase) -> None:
5152
self.entity_name = name
5253
self.cwd = cwd
5354
self.managed = False
54-
self.step_settings = step_settings
55+
self.step_settings = copy.deepcopy(step_settings)
5556
self.meta: t.Dict[str, str] = {}
5657

5758
@property

smartsim/_core/schemas/dragonRequests.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@
2626

2727
import typing as t
2828

29-
from pydantic import BaseModel, Field, PositiveInt
29+
from pydantic import BaseModel, Field, NonNegativeInt, PositiveInt, ValidationError
3030

3131
import smartsim._core.schemas.utils as _utils
32+
from smartsim.error.errors import SmartSimError
3233

3334
# Black and Pylint disagree about where to put the `...`
3435
# pylint: disable=multiple-statements
@@ -39,6 +40,43 @@
3940
class DragonRequest(BaseModel): ...
4041

4142

43+
class DragonRunPolicy(BaseModel):
44+
"""Policy specifying hardware constraints when running a Dragon job"""
45+
46+
cpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
47+
"""List of CPU indices to which the job should be pinned"""
48+
gpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
49+
"""List of GPU indices to which the job should be pinned"""
50+
51+
@staticmethod
52+
def from_run_args(
53+
run_args: t.Dict[str, t.Union[int, str, float, None]]
54+
) -> "DragonRunPolicy":
55+
"""Create a DragonRunPolicy with hardware constraints passed from
56+
a dictionary of run arguments
57+
:param run_args: Dictionary of run arguments
58+
:returns: DragonRunPolicy instance created from the run arguments"""
59+
gpu_args = ""
60+
if gpu_arg_value := run_args.get("gpu-affinity", None):
61+
gpu_args = str(gpu_arg_value)
62+
63+
cpu_args = ""
64+
if cpu_arg_value := run_args.get("cpu-affinity", None):
65+
cpu_args = str(cpu_arg_value)
66+
67+
# run args converted to a string must be split back into a list[int]
68+
gpu_affinity = [int(x.strip()) for x in gpu_args.split(",") if x]
69+
cpu_affinity = [int(x.strip()) for x in cpu_args.split(",") if x]
70+
71+
try:
72+
return DragonRunPolicy(
73+
cpu_affinity=cpu_affinity,
74+
gpu_affinity=gpu_affinity,
75+
)
76+
except ValidationError as ex:
77+
raise SmartSimError("Unable to build DragonRunPolicy") from ex
78+
79+
4280
class DragonRunRequestView(DragonRequest):
4381
exe: t.Annotated[str, Field(min_length=1)]
4482
exe_args: t.List[t.Annotated[str, Field(min_length=1)]] = []
@@ -57,6 +95,7 @@ class DragonRunRequestView(DragonRequest):
5795
@request_registry.register("run")
5896
class DragonRunRequest(DragonRunRequestView):
5997
current_env: t.Dict[str, t.Optional[str]] = {}
98+
policy: t.Optional[DragonRunPolicy] = None
6099

61100
def __str__(self) -> str:
62101
return str(DragonRunRequestView.parse_obj(self.dict(exclude={"current_env"})))

0 commit comments

Comments
 (0)