Skip to content

Commit 0030a4a

Browse files
authored
Revert "Add ability to specify hardware policies on dragon run requests" (#637)
Reverts #631
1 parent 5fac3e2 commit 0030a4a

File tree

16 files changed

+25
-1826
lines changed

16 files changed

+25
-1826
lines changed

doc/changelog.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ Jump to:
1313

1414
Description
1515

16-
- Add hardware pinning capability when using dragon
1716
- Add TorchWorker first implementation and mock inference app example
1817
- Add EnvironmentConfigLoader for ML Worker Manager
1918
- Add Model schema with model metadata included

doc/dragon.rst

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -65,34 +65,6 @@ In the next sections, we detail how Dragon is integrated into SmartSim.
6565

6666
For more information on HPC launchers, visit the :ref:`Run Settings<run_settings_hpc_ex>` page.
6767

68-
Hardware Pinning
69-
================
70-
71-
Dragon also enables users to specify hardware constraints using ``DragonRunSettings``. CPU
72-
and GPU affinity can be specified using the ``DragonRunSettings`` object. The following
73-
example demonstrates how to specify CPU affinity and GPU affinities simultaneously. Note
74-
that affinities are passed as a list of device indices.
75-
76-
.. code-block:: python
77-
78-
# Because "dragon" was specified as the launcher during Experiment initialization,
79-
# create_run_settings will return a DragonRunSettings object
80-
rs = exp.create_run_settings(exe="mpi_app",
81-
exe_args=["--option", "value"],
82-
env_vars={"MYVAR": "VALUE"})
83-
84-
# Request the first 8 CPUs for this job
85-
rs.set_cpu_affinity(list(range(9)))
86-
87-
# Request the first two GPUs on the node for this job
88-
rs.set_gpu_affinity([0, 1])
89-
90-
.. note::
91-
92-
SmartSim launches jobs in the order they are received on the first available
93-
host in a round-robin pattern. To ensure a process is launched on a node with
94-
specific features, configure a hostname constraint.
95-
9668
=================
9769
The Dragon Server
9870
=================

doc/tutorials/online_analysis/lattice/online_analysis.ipynb

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,6 @@
378378
},
379379
{
380380
"cell_type": "code",
381-
"id": "6f3ed63d-e324-443d-9b68-b2cf618d31c7",
382381
"execution_count": 7,
383382
"metadata": {},
384383
"outputs": [
@@ -400,15 +399,13 @@
400399
},
401400
{
402401
"cell_type": "markdown",
403-
"id": "96c154fe-5ca8-4d89-91f8-8fd4e75cb80e",
404402
"metadata": {},
405403
"source": [
406404
"We then apply the function `probe_points` to the `ux` and `uy` tensors computed in the last time step of the previous simulation. Note that all tensors are already on the DB, thus we can reference them by name. Finally, we download and plot the output (a 2D velocity field), which is stored as `probe_u` on the DB."
407405
]
408406
},
409407
{
410408
"cell_type": "code",
411-
"id": "36e3b415-dcc1-4d25-9cce-52388146a4bb",
412409
"execution_count": 8,
413410
"metadata": {},
414411
"outputs": [
@@ -435,7 +432,6 @@
435432
},
436433
{
437434
"cell_type": "markdown",
438-
"id": "9d7e4966-a0de-480c-9556-936197a5a5d2",
439435
"metadata": {},
440436
"source": [
441437
"### Uploading a function inline\n",
@@ -457,7 +453,6 @@
457453
},
458454
{
459455
"cell_type": "markdown",
460-
"id": "1c4daf43-34d0-482a-b9b5-b3b6f1e173c4",
461456
"metadata": {},
462457
"source": [
463458
"We then store the function on the DB under the key `norm_function`."
@@ -475,7 +470,6 @@
475470
},
476471
{
477472
"cell_type": "markdown",
478-
"id": "19409ac6-e118-44db-a847-2d905fdf0331",
479473
"metadata": {},
480474
"source": [
481475
"Note that the key we used identifies a functional unit containing the function itself: this is similar to the key used to store the `probe` script above. When we want to run the function, we just call it with `run_script`, by indicating the `script` key as `\"norm_function\"` and the name of the function itself as `\"compute_norm\"`."

smartsim/_core/launcher/dragon/dragonBackend.py

Lines changed: 6 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,9 @@ def group_infos(self) -> dict[str, ProcessGroupInfo]:
214214
def _initialize_hosts(self) -> None:
215215
with self._queue_lock:
216216
self._hosts: t.List[str] = sorted(
217-
node for node in dragon_machine.System().nodes
217+
dragon_machine.Node(node).hostname
218+
for node in dragon_machine.System().nodes
218219
)
219-
self._nodes = [dragon_machine.Node(node) for node in self._hosts]
220-
self._cpus = [node.num_cpus for node in self._nodes]
221-
self._gpus = [node.num_gpus for node in self._nodes]
222-
223220
"""List of hosts available in allocation"""
224221
self._free_hosts: t.Deque[str] = collections.deque(self._hosts)
225222
"""List of hosts on which steps can be launched"""
@@ -291,34 +288,6 @@ def current_time(self) -> float:
291288
"""Current time for DragonBackend object, in seconds since the Epoch"""
292289
return time.time()
293290

294-
def _can_honor_policy(
295-
self, request: DragonRunRequest
296-
) -> t.Tuple[bool, t.Optional[str]]:
297-
"""Check if the policy can be honored with resources available
298-
in the allocation.
299-
:param request: DragonRunRequest containing policy information
300-
:returns: Tuple indicating if the policy can be honored and
301-
an optional error message"""
302-
# ensure the policy can be honored
303-
if request.policy:
304-
if request.policy.cpu_affinity:
305-
# make sure some node has enough CPUs
306-
available = max(self._cpus)
307-
requested = max(request.policy.cpu_affinity)
308-
309-
if requested >= available:
310-
return False, "Cannot satisfy request, not enough CPUs available"
311-
312-
if request.policy.gpu_affinity:
313-
# make sure some node has enough GPUs
314-
available = max(self._gpus)
315-
requested = max(request.policy.gpu_affinity)
316-
317-
if requested >= available:
318-
return False, "Cannot satisfy request, not enough GPUs available"
319-
320-
return True, None
321-
322291
def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]:
323292
"""Check if request can be honored with resources available in the allocation.
324293
@@ -333,11 +302,6 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]
333302
if self._shutdown_requested:
334303
message = "Cannot satisfy request, server is shutting down."
335304
return False, message
336-
337-
honorable, err = self._can_honor_policy(request)
338-
if not honorable:
339-
return False, err
340-
341305
return True, None
342306

343307
def _allocate_step(
@@ -446,46 +410,6 @@ def infra_ddict(self) -> str:
446410

447411
return str(self._infra_ddict.serialize())
448412

449-
@staticmethod
450-
def create_run_policy(
451-
request: DragonRequest, node_name: str
452-
) -> "dragon_policy.Policy":
453-
"""Create a dragon Policy from the request and node name
454-
:param request: DragonRunRequest containing policy information
455-
:param node_name: Name of the node on which the process will run
456-
:returns: dragon_policy.Policy object mapped from request properties"""
457-
if isinstance(request, DragonRunRequest):
458-
run_request: DragonRunRequest = request
459-
460-
affinity = dragon_policy.Policy.Affinity.DEFAULT
461-
cpu_affinity: t.List[int] = []
462-
gpu_affinity: t.List[int] = []
463-
464-
# Customize policy only if the client requested it, otherwise use default
465-
if run_request.policy is not None:
466-
# Affinities are not mutually exclusive. If specified, both are used
467-
if run_request.policy.cpu_affinity:
468-
affinity = dragon_policy.Policy.Affinity.SPECIFIC
469-
cpu_affinity = run_request.policy.cpu_affinity
470-
471-
if run_request.policy.gpu_affinity:
472-
affinity = dragon_policy.Policy.Affinity.SPECIFIC
473-
gpu_affinity = run_request.policy.gpu_affinity
474-
475-
if affinity != dragon_policy.Policy.Affinity.DEFAULT:
476-
return dragon_policy.Policy(
477-
placement=dragon_policy.Policy.Placement.HOST_NAME,
478-
host_name=node_name,
479-
affinity=affinity,
480-
cpu_affinity=cpu_affinity,
481-
gpu_affinity=gpu_affinity,
482-
)
483-
484-
return dragon_policy.Policy(
485-
placement=dragon_policy.Policy.Placement.HOST_NAME,
486-
host_name=node_name,
487-
)
488-
489413
def _start_steps(self) -> None:
490414
self._heartbeat()
491415
with self._queue_lock:
@@ -508,7 +432,10 @@ def _start_steps(self) -> None:
508432

509433
policies = []
510434
for node_name in hosts:
511-
local_policy = self.create_run_policy(request, node_name)
435+
local_policy = dragon_policy.Policy(
436+
placement=dragon_policy.Policy.Placement.HOST_NAME,
437+
host_name=node_name,
438+
)
512439
policies.extend([local_policy] * request.tasks_per_node)
513440
tmp_proc = dragon_process.ProcessTemplate(
514441
target=request.exe,

smartsim/_core/launcher/dragon/dragonLauncher.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import os
3030
import typing as t
3131

32-
from smartsim._core.schemas.dragonRequests import DragonRunPolicy
33-
3432
from ...._core.launcher.stepMapping import StepMap
3533
from ....error import LauncherError, SmartSimError
3634
from ....log import get_logger
@@ -170,9 +168,6 @@ def run(self, step: Step) -> t.Optional[str]:
170168
merged_env = self._connector.merge_persisted_env(os.environ.copy())
171169
nodes = int(run_args.get("nodes", None) or 1)
172170
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)
173-
174-
policy = DragonRunPolicy.from_run_args(run_args)
175-
176171
response = _assert_schema_type(
177172
self._connector.send_request(
178173
DragonRunRequest(
@@ -186,7 +181,6 @@ def run(self, step: Step) -> t.Optional[str]:
186181
current_env=merged_env,
187182
output_file=out,
188183
error_file=err,
189-
policy=policy,
190184
)
191185
),
192186
DragonRunResponse,

smartsim/_core/launcher/step/dragonStep.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,7 @@
3030
import sys
3131
import typing as t
3232

33-
from ...._core.schemas.dragonRequests import (
34-
DragonRunPolicy,
35-
DragonRunRequest,
36-
request_registry,
37-
)
33+
from ...._core.schemas.dragonRequests import DragonRunRequest, request_registry
3834
from ....error.errors import SSUnsupportedError
3935
from ....log import get_logger
4036
from ....settings import (
@@ -170,11 +166,8 @@ def _write_request_file(self) -> str:
170166
nodes = int(run_args.get("nodes", None) or 1)
171167
tasks_per_node = int(run_args.get("tasks-per-node", None) or 1)
172168

173-
policy = DragonRunPolicy.from_run_args(run_args)
174-
175169
cmd = step.get_launch_cmd()
176170
out, err = step.get_output_files()
177-
178171
request = DragonRunRequest(
179172
exe=cmd[0],
180173
exe_args=cmd[1:],
@@ -186,7 +179,6 @@ def _write_request_file(self) -> str:
186179
current_env=os.environ,
187180
output_file=out,
188181
error_file=err,
189-
policy=policy,
190182
)
191183
requests.append(request_registry.to_string(request))
192184
with open(request_file, "w", encoding="utf-8") as script_file:

smartsim/_core/launcher/step/step.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
from __future__ import annotations
2828

29-
import copy
3029
import functools
3130
import os.path as osp
3231
import pathlib
@@ -52,7 +51,7 @@ def __init__(self, name: str, cwd: str, step_settings: SettingsBase) -> None:
5251
self.entity_name = name
5352
self.cwd = cwd
5453
self.managed = False
55-
self.step_settings = copy.deepcopy(step_settings)
54+
self.step_settings = step_settings
5655
self.meta: t.Dict[str, str] = {}
5756

5857
@property

smartsim/_core/schemas/dragonRequests.py

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@
2626

2727
import typing as t
2828

29-
from pydantic import BaseModel, Field, NonNegativeInt, PositiveInt, ValidationError
29+
from pydantic import BaseModel, Field, PositiveInt
3030

3131
import smartsim._core.schemas.utils as _utils
32-
from smartsim.error.errors import SmartSimError
3332

3433
# Black and Pylint disagree about where to put the `...`
3534
# pylint: disable=multiple-statements
@@ -40,43 +39,6 @@
4039
class DragonRequest(BaseModel): ...
4140

4241

43-
class DragonRunPolicy(BaseModel):
44-
"""Policy specifying hardware constraints when running a Dragon job"""
45-
46-
cpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
47-
"""List of CPU indices to which the job should be pinned"""
48-
gpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list)
49-
"""List of GPU indices to which the job should be pinned"""
50-
51-
@staticmethod
52-
def from_run_args(
53-
run_args: t.Dict[str, t.Union[int, str, float, None]]
54-
) -> "DragonRunPolicy":
55-
"""Create a DragonRunPolicy with hardware constraints passed from
56-
a dictionary of run arguments
57-
:param run_args: Dictionary of run arguments
58-
:returns: DragonRunPolicy instance created from the run arguments"""
59-
gpu_args = ""
60-
if gpu_arg_value := run_args.get("gpu-affinity", None):
61-
gpu_args = str(gpu_arg_value)
62-
63-
cpu_args = ""
64-
if cpu_arg_value := run_args.get("cpu-affinity", None):
65-
cpu_args = str(cpu_arg_value)
66-
67-
# run args converted to a string must be split back into a list[int]
68-
gpu_affinity = [int(x.strip()) for x in gpu_args.split(",") if x]
69-
cpu_affinity = [int(x.strip()) for x in cpu_args.split(",") if x]
70-
71-
try:
72-
return DragonRunPolicy(
73-
cpu_affinity=cpu_affinity,
74-
gpu_affinity=gpu_affinity,
75-
)
76-
except ValidationError as ex:
77-
raise SmartSimError("Unable to build DragonRunPolicy") from ex
78-
79-
8042
class DragonRunRequestView(DragonRequest):
8143
exe: t.Annotated[str, Field(min_length=1)]
8244
exe_args: t.List[t.Annotated[str, Field(min_length=1)]] = []
@@ -95,7 +57,6 @@ class DragonRunRequestView(DragonRequest):
9557
@request_registry.register("run")
9658
class DragonRunRequest(DragonRunRequestView):
9759
current_env: t.Dict[str, t.Optional[str]] = {}
98-
policy: t.Optional[DragonRunPolicy] = None
9960

10061
def __str__(self) -> str:
10162
return str(DragonRunRequestView.parse_obj(self.dict(exclude={"current_env"})))

0 commit comments

Comments
 (0)