Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ def __init__(self, pid: int) -> None:
self._step_ids = (f"{create_short_id_str()}-{id}" for id in itertools.count())
"""Incremental ID to assign to new steps prior to execution"""

self._initialize_hosts()
self._queued_steps: "collections.OrderedDict[str, DragonRunRequest]" = (
collections.OrderedDict()
)
Expand Down Expand Up @@ -188,11 +187,7 @@ def __init__(self, pid: int) -> None:
else 5
)
"""Time in seconds needed to server to complete shutdown"""

self._view = DragonBackendView(self)
logger.debug(self._view.host_desc)
self._infra_ddict: t.Optional[dragon_ddict.DDict] = None
self._prioritizer = NodePrioritizer(self._nodes, self._queue_lock)

self._nodes: t.List["dragon_machine.Node"] = []
"""Node capability information for hosts in the allocation"""
Expand All @@ -205,6 +200,11 @@ def __init__(self, pid: int) -> None:
self._allocated_hosts: t.Dict[str, t.Set[str]] = {}
"""Mapping with hostnames as keys and a set of running step IDs as the value"""

self._initialize_hosts()
self._view = DragonBackendView(self)
logger.debug(self._view.host_desc)
self._prioritizer = NodePrioritizer(self._nodes, self._queue_lock)

@property
def hosts(self) -> list[str]:
with self._queue_lock:
Expand Down
6 changes: 0 additions & 6 deletions smartsim/_core/launcher/dragon/pqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,6 @@ def remove(
:param tracking_id: a unique task identifier executing on the node
to remove
:raises ValueError: if tracking_id is already assigned to this node"""
if tracking_id and tracking_id not in self.assigned_tasks:
raise ValueError("Attempted removal of untracked item")

self._num_refs = max(self._num_refs - 1, 0)
if tracking_id:
self._assigned_tasks = self._assigned_tasks - {tracking_id}
Expand Down Expand Up @@ -460,8 +457,5 @@ def next_n(
:param hosts: a list of hostnames used to filter the available nodes
:returns: Collection of reserved nodes
:raises ValueError: if the hosts parameter is an empty list"""
if hosts is not None and not hosts:
raise ValueError("No hostnames provided")

heap = self._create_sub_heap(hosts, filter_on)
return self._get_next_n_available_nodes(num_items, heap, tracking_id)
49 changes: 40 additions & 9 deletions tests/test_dragon_run_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def set_mock_group_infos(
}

monkeypatch.setattr(dragon_backend, "_group_infos", group_infos)
monkeypatch.setattr(dragon_backend, "_allocated_hosts", {hosts[0]: "abc123-1"})
monkeypatch.setattr(dragon_backend, "_allocated_hosts", {hosts[0]: {"abc123-1"}})
monkeypatch.setattr(dragon_backend, "_running_steps", ["abc123-1"])

return group_infos
Expand Down Expand Up @@ -221,8 +221,8 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend.free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[0]]
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[1]]

monkeypatch.setattr(
dragon_backend._group_infos[step_id].process_group, "status", "Running"
Expand All @@ -233,8 +233,8 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend.free_hosts) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[0]]
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[1]]

dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED

Expand Down Expand Up @@ -316,8 +316,8 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._prioritizer.unassigned()) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[0]]
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[1]]

monkeypatch.setattr(
dragon_backend._group_infos[step_id].process_group, "status", "Running"
Expand All @@ -328,8 +328,8 @@ def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None:
assert dragon_backend._running_steps == [step_id]
assert len(dragon_backend._queued_steps) == 0
assert len(dragon_backend._prioritizer.unassigned()) == 1
assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id
assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[0]]
assert step_id in dragon_backend._allocated_hosts[dragon_backend.hosts[1]]

dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED

Expand Down Expand Up @@ -728,3 +728,34 @@ def test_can_honor_hosts_unavailable_hosts_ok(monkeypatch: pytest.MonkeyPatch) -
assert can_honor, error_msg
# confirm failure message indicates number of nodes requested as cause
assert error_msg is None, error_msg


def test_can_honor_hosts_1_hosts_requested(monkeypatch: pytest.MonkeyPatch) -> None:
"""Verify that requesting nodes with invalid names causes number of available
nodes check to be reduced but still passes if enough valid named nodes are passed"""
dragon_backend = get_mock_backend(monkeypatch, num_cpus=8, num_gpus=0)

# let's supply 2 valid and 1 invalid hostname
actual_hosts = list(dragon_backend._hosts)
actual_hosts[0] = f"x{actual_hosts[0]}"

host_list = ",".join(actual_hosts)

run_req = DragonRunRequest(
exe="sleep",
exe_args=["5"],
path="/a/fake/path",
nodes=1, # <----- requesting 0 nodes - should be ignored
hostlist=host_list, # <--- two valid names are available
tasks=1,
tasks_per_node=1,
env={},
current_env={},
pmi_enabled=False,
policy=DragonRunPolicy(),
)

can_honor, error_msg = dragon_backend._can_honor(run_req)

# confirm the failure is indicated
assert can_honor, error_msg
18 changes: 8 additions & 10 deletions tests/test_node_prioritizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,9 @@ def test_node_prioritizer_multi_increment_subheap_assigned() -> None:
assert len(all_tracking_info) == 0


def test_node_prioritizer_empty_subheap_next_w_hosts() -> None:
def test_node_prioritizer_empty_subheap_next_w_no_hosts() -> None:
"""Verify that retrieving multiple nodes via `next_n` API does
not allow an empty host list"""
with an empty host list uses the entire available host list"""

num_cpu_nodes, num_gpu_nodes = 8, 0
cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes)
Expand All @@ -476,15 +476,15 @@ def test_node_prioritizer_empty_subheap_next_w_hosts() -> None:

# request n == {num_requested} nodes from set of 3 available
num_requested = 1
with pytest.raises(ValueError) as ex:
p.next(hosts=hostnames)
node = p.next(hosts=hostnames)
assert node

assert "No hostnames provided" == ex.value.args[0]
# assert "No hostnames provided" == ex.value.args[0]


def test_node_prioritizer_empty_subheap_next_n_w_hosts() -> None:
"""Verify that retrieving multiple nodes via `next_n` API does
not allow an empty host list"""
not blow up with an empty host list"""

num_cpu_nodes, num_gpu_nodes = 8, 0
cpu_hosts, gpu_hosts = mock_node_hosts(num_cpu_nodes, num_gpu_nodes)
Expand All @@ -501,10 +501,8 @@ def test_node_prioritizer_empty_subheap_next_n_w_hosts() -> None:

# request n == {num_requested} nodes from set of 3 available
num_requested = 1
with pytest.raises(ValueError) as ex:
p.next_n(num_requested, hosts=hostnames)

assert "No hostnames provided" == ex.value.args[0]
node = p.next_n(num_requested, hosts=hostnames)
assert node is not None


@pytest.mark.parametrize("num_requested", [-100, -1, 0])
Expand Down