diff --git a/doc/changelog.md b/doc/changelog.md index ee41fabf88..f4adf1c091 100644 --- a/doc/changelog.md +++ b/doc/changelog.md @@ -28,6 +28,7 @@ To be released at some future point in time Description +- Add hardware pinning capability when using dragon - Pin NumPy version to 1.x - New launcher support for SGE (and similar derivatives) - Fix test outputs being created in incorrect directory diff --git a/doc/dragon.rst b/doc/dragon.rst index 0bf6a8ea3c..e19b40e4b7 100644 --- a/doc/dragon.rst +++ b/doc/dragon.rst @@ -65,6 +65,34 @@ In the next sections, we detail how Dragon is integrated into SmartSim. For more information on HPC launchers, visit the :ref:`Run Settings` page. +Hardware Pinning +================ + +Dragon also enables users to specify hardware constraints using ``DragonRunSettings``. CPU +and GPU affinity can be specified using the ``DragonRunSettings`` object. The following +example demonstrates how to specify CPU affinity and GPU affinities simultaneously. Note +that affinities are passed as a list of device indices. + +.. code-block:: python + + # Because "dragon" was specified as the launcher during Experiment initialization, + # create_run_settings will return a DragonRunSettings object + rs = exp.create_run_settings(exe="mpi_app", + exe_args=["--option", "value"], + env_vars={"MYVAR": "VALUE"}) + + # Request the first 8 CPUs for this job + rs.set_cpu_affinity(list(range(9))) + + # Request the first two GPUs on the node for this job + rs.set_gpu_affinity([0, 1]) + +.. note:: + + SmartSim launches jobs in the order they are received on the first available + host in a round-robin pattern. To ensure a process is launched on a node with + specific features, configure a hostname constraint. + ================= The Dragon Server ================= diff --git a/doc/tutorials/online_analysis/lattice/online_analysis.ipynb b/doc/tutorials/online_analysis/lattice/online_analysis.ipynb index 412b63dd01..c5f58fa97b 100644 --- a/doc/tutorials/online_analysis/lattice/online_analysis.ipynb +++ b/doc/tutorials/online_analysis/lattice/online_analysis.ipynb @@ -378,6 +378,7 @@ }, { "cell_type": "code", + "id": "6f3ed63d-e324-443d-9b68-b2cf618d31c7", "execution_count": 7, "metadata": {}, "outputs": [ @@ -399,6 +400,7 @@ }, { "cell_type": "markdown", + "id": "96c154fe-5ca8-4d89-91f8-8fd4e75cb80e", "metadata": {}, "source": [ "We then apply the function `probe_points` to the `ux` and `uy` tensors computed in the last time step of the previous simulation. Note that all tensors are already on the DB, thus we can reference them by name. Finally, we download and plot the output (a 2D velocity field), which is stored as `probe_u` on the DB." @@ -406,6 +408,7 @@ }, { "cell_type": "code", + "id": "36e3b415-dcc1-4d25-9cce-52388146a4bb", "execution_count": 8, "metadata": {}, "outputs": [ @@ -432,6 +435,7 @@ }, { "cell_type": "markdown", + "id": "9d7e4966-a0de-480c-9556-936197a5a5d2", "metadata": {}, "source": [ "### Uploading a function inline\n", @@ -453,6 +457,7 @@ }, { "cell_type": "markdown", + "id": "1c4daf43-34d0-482a-b9b5-b3b6f1e173c4", "metadata": {}, "source": [ "We then store the function on the DB under the key `norm_function`." @@ -470,6 +475,7 @@ }, { "cell_type": "markdown", + "id": "19409ac6-e118-44db-a847-2d905fdf0331", "metadata": {}, "source": [ "Note that the key we used identifies a functional unit containing the function itself: this is similar to the key used to store the `probe` script above. When we want to run the function, we just call it with `run_script`, by indicating the `script` key as `\"norm_function\"` and the name of the function itself as `\"compute_norm\"`." diff --git a/smartsim/_core/launcher/dragon/dragonBackend.py b/smartsim/_core/launcher/dragon/dragonBackend.py index dcc5c8392b..2938746361 100644 --- a/smartsim/_core/launcher/dragon/dragonBackend.py +++ b/smartsim/_core/launcher/dragon/dragonBackend.py @@ -214,9 +214,12 @@ def group_infos(self) -> dict[str, ProcessGroupInfo]: def _initialize_hosts(self) -> None: with self._queue_lock: self._hosts: t.List[str] = sorted( - dragon_machine.Node(node).hostname - for node in dragon_machine.System().nodes + node for node in dragon_machine.System().nodes ) + self._nodes = [dragon_machine.Node(node) for node in self._hosts] + self._cpus = [node.num_cpus for node in self._nodes] + self._gpus = [node.num_gpus for node in self._nodes] + """List of hosts available in allocation""" self._free_hosts: t.Deque[str] = collections.deque(self._hosts) """List of hosts on which steps can be launched""" @@ -288,6 +291,34 @@ def current_time(self) -> float: """Current time for DragonBackend object, in seconds since the Epoch""" return time.time() + def _can_honor_policy( + self, request: DragonRunRequest + ) -> t.Tuple[bool, t.Optional[str]]: + """Check if the policy can be honored with resources available + in the allocation. + :param request: DragonRunRequest containing policy information + :returns: Tuple indicating if the policy can be honored and + an optional error message""" + # ensure the policy can be honored + if request.policy: + if request.policy.cpu_affinity: + # make sure some node has enough CPUs + available = max(self._cpus) + requested = max(request.policy.cpu_affinity) + + if requested >= available: + return False, "Cannot satisfy request, not enough CPUs available" + + if request.policy.gpu_affinity: + # make sure some node has enough GPUs + available = max(self._gpus) + requested = max(request.policy.gpu_affinity) + + if requested >= available: + return False, "Cannot satisfy request, not enough GPUs available" + + return True, None + def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]]: """Check if request can be honored with resources available in the allocation. @@ -302,6 +333,11 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str] if self._shutdown_requested: message = "Cannot satisfy request, server is shutting down." return False, message + + honorable, err = self._can_honor_policy(request) + if not honorable: + return False, err + return True, None def _allocate_step( @@ -410,6 +446,46 @@ def infra_ddict(self) -> str: return str(self._infra_ddict.serialize()) + @staticmethod + def create_run_policy( + request: DragonRequest, node_name: str + ) -> "dragon_policy.Policy": + """Create a dragon Policy from the request and node name + :param request: DragonRunRequest containing policy information + :param node_name: Name of the node on which the process will run + :returns: dragon_policy.Policy object mapped from request properties""" + if isinstance(request, DragonRunRequest): + run_request: DragonRunRequest = request + + affinity = dragon_policy.Policy.Affinity.DEFAULT + cpu_affinity: t.List[int] = [] + gpu_affinity: t.List[int] = [] + + # Customize policy only if the client requested it, otherwise use default + if run_request.policy is not None: + # Affinities are not mutually exclusive. If specified, both are used + if run_request.policy.cpu_affinity: + affinity = dragon_policy.Policy.Affinity.SPECIFIC + cpu_affinity = run_request.policy.cpu_affinity + + if run_request.policy.gpu_affinity: + affinity = dragon_policy.Policy.Affinity.SPECIFIC + gpu_affinity = run_request.policy.gpu_affinity + + if affinity != dragon_policy.Policy.Affinity.DEFAULT: + return dragon_policy.Policy( + placement=dragon_policy.Policy.Placement.HOST_NAME, + host_name=node_name, + affinity=affinity, + cpu_affinity=cpu_affinity, + gpu_affinity=gpu_affinity, + ) + + return dragon_policy.Policy( + placement=dragon_policy.Policy.Placement.HOST_NAME, + host_name=node_name, + ) + def _start_steps(self) -> None: self._heartbeat() with self._queue_lock: @@ -432,10 +508,7 @@ def _start_steps(self) -> None: policies = [] for node_name in hosts: - local_policy = dragon_policy.Policy( - placement=dragon_policy.Policy.Placement.HOST_NAME, - host_name=node_name, - ) + local_policy = self.create_run_policy(request, node_name) policies.extend([local_policy] * request.tasks_per_node) tmp_proc = dragon_process.ProcessTemplate( target=request.exe, diff --git a/smartsim/_core/launcher/dragon/dragonLauncher.py b/smartsim/_core/launcher/dragon/dragonLauncher.py index 17b47e3090..9078fed54f 100644 --- a/smartsim/_core/launcher/dragon/dragonLauncher.py +++ b/smartsim/_core/launcher/dragon/dragonLauncher.py @@ -29,6 +29,8 @@ import os import typing as t +from smartsim._core.schemas.dragonRequests import DragonRunPolicy + from ...._core.launcher.stepMapping import StepMap from ....error import LauncherError, SmartSimError from ....log import get_logger @@ -168,6 +170,9 @@ def run(self, step: Step) -> t.Optional[str]: merged_env = self._connector.merge_persisted_env(os.environ.copy()) nodes = int(run_args.get("nodes", None) or 1) tasks_per_node = int(run_args.get("tasks-per-node", None) or 1) + + policy = DragonRunPolicy.from_run_args(run_args) + response = _assert_schema_type( self._connector.send_request( DragonRunRequest( @@ -181,6 +186,7 @@ def run(self, step: Step) -> t.Optional[str]: current_env=merged_env, output_file=out, error_file=err, + policy=policy, ) ), DragonRunResponse, diff --git a/smartsim/_core/launcher/step/dragonStep.py b/smartsim/_core/launcher/step/dragonStep.py index 036a9e5654..dd93d7910c 100644 --- a/smartsim/_core/launcher/step/dragonStep.py +++ b/smartsim/_core/launcher/step/dragonStep.py @@ -30,7 +30,11 @@ import sys import typing as t -from ...._core.schemas.dragonRequests import DragonRunRequest, request_registry +from ...._core.schemas.dragonRequests import ( + DragonRunPolicy, + DragonRunRequest, + request_registry, +) from ....error.errors import SSUnsupportedError from ....log import get_logger from ....settings import ( @@ -166,8 +170,11 @@ def _write_request_file(self) -> str: nodes = int(run_args.get("nodes", None) or 1) tasks_per_node = int(run_args.get("tasks-per-node", None) or 1) + policy = DragonRunPolicy.from_run_args(run_args) + cmd = step.get_launch_cmd() out, err = step.get_output_files() + request = DragonRunRequest( exe=cmd[0], exe_args=cmd[1:], @@ -179,6 +186,7 @@ def _write_request_file(self) -> str: current_env=os.environ, output_file=out, error_file=err, + policy=policy, ) requests.append(request_registry.to_string(request)) with open(request_file, "w", encoding="utf-8") as script_file: diff --git a/smartsim/_core/launcher/step/step.py b/smartsim/_core/launcher/step/step.py index 2cce6e6107..171254e32a 100644 --- a/smartsim/_core/launcher/step/step.py +++ b/smartsim/_core/launcher/step/step.py @@ -26,6 +26,7 @@ from __future__ import annotations +import copy import functools import os.path as osp import pathlib @@ -51,7 +52,7 @@ def __init__(self, name: str, cwd: str, step_settings: SettingsBase) -> None: self.entity_name = name self.cwd = cwd self.managed = False - self.step_settings = step_settings + self.step_settings = copy.deepcopy(step_settings) self.meta: t.Dict[str, str] = {} @property diff --git a/smartsim/_core/schemas/dragonRequests.py b/smartsim/_core/schemas/dragonRequests.py index 3e384f746a..487ea915a0 100644 --- a/smartsim/_core/schemas/dragonRequests.py +++ b/smartsim/_core/schemas/dragonRequests.py @@ -26,9 +26,10 @@ import typing as t -from pydantic import BaseModel, Field, PositiveInt +from pydantic import BaseModel, Field, NonNegativeInt, PositiveInt, ValidationError import smartsim._core.schemas.utils as _utils +from smartsim.error.errors import SmartSimError # Black and Pylint disagree about where to put the `...` # pylint: disable=multiple-statements @@ -39,6 +40,43 @@ class DragonRequest(BaseModel): ... +class DragonRunPolicy(BaseModel): + """Policy specifying hardware constraints when running a Dragon job""" + + cpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list) + """List of CPU indices to which the job should be pinned""" + gpu_affinity: t.List[NonNegativeInt] = Field(default_factory=list) + """List of GPU indices to which the job should be pinned""" + + @staticmethod + def from_run_args( + run_args: t.Dict[str, t.Union[int, str, float, None]] + ) -> "DragonRunPolicy": + """Create a DragonRunPolicy with hardware constraints passed from + a dictionary of run arguments + :param run_args: Dictionary of run arguments + :returns: DragonRunPolicy instance created from the run arguments""" + gpu_args = "" + if gpu_arg_value := run_args.get("gpu-affinity", None): + gpu_args = str(gpu_arg_value) + + cpu_args = "" + if cpu_arg_value := run_args.get("cpu-affinity", None): + cpu_args = str(cpu_arg_value) + + # run args converted to a string must be split back into a list[int] + gpu_affinity = [int(x.strip()) for x in gpu_args.split(",") if x] + cpu_affinity = [int(x.strip()) for x in cpu_args.split(",") if x] + + try: + return DragonRunPolicy( + cpu_affinity=cpu_affinity, + gpu_affinity=gpu_affinity, + ) + except ValidationError as ex: + raise SmartSimError("Unable to build DragonRunPolicy") from ex + + class DragonRunRequestView(DragonRequest): exe: t.Annotated[str, Field(min_length=1)] exe_args: t.List[t.Annotated[str, Field(min_length=1)]] = [] @@ -57,6 +95,7 @@ class DragonRunRequestView(DragonRequest): @request_registry.register("run") class DragonRunRequest(DragonRunRequestView): current_env: t.Dict[str, t.Optional[str]] = {} + policy: t.Optional[DragonRunPolicy] = None def __str__(self) -> str: return str(DragonRunRequestView.parse_obj(self.dict(exclude={"current_env"}))) diff --git a/smartsim/settings/dragonRunSettings.py b/smartsim/settings/dragonRunSettings.py index b8baa4708c..69a91547e7 100644 --- a/smartsim/settings/dragonRunSettings.py +++ b/smartsim/settings/dragonRunSettings.py @@ -28,6 +28,8 @@ import typing as t +from typing_extensions import override + from ..log import get_logger from .base import RunSettings @@ -63,6 +65,7 @@ def __init__( **kwargs, ) + @override def set_nodes(self, nodes: int) -> None: """Set the number of nodes @@ -70,9 +73,38 @@ def set_nodes(self, nodes: int) -> None: """ self.run_args["nodes"] = nodes + @override def set_tasks_per_node(self, tasks_per_node: int) -> None: """Set the number of tasks for this job :param tasks_per_node: number of tasks per node """ self.run_args["tasks-per-node"] = tasks_per_node + + @override + def set_node_feature(self, feature_list: t.Union[str, t.List[str]]) -> None: + """Specify the node feature for this job + + :param feature_list: a collection of strings representing the required + node features. Currently supported node features are: "gpu" + """ + if isinstance(feature_list, str): + feature_list = feature_list.strip().split() + elif not all(isinstance(feature, str) for feature in feature_list): + raise TypeError("feature_list must be string or list of strings") + + self.run_args["node-feature"] = ",".join(feature_list) + + def set_cpu_affinity(self, devices: t.List[int]) -> None: + """Set the CPU affinity for this job + + :param devices: list of CPU indices to execute on + """ + self.run_args["cpu-affinity"] = ",".join(str(device) for device in devices) + + def set_gpu_affinity(self, devices: t.List[int]) -> None: + """Set the GPU affinity for this job + + :param devices: list of GPU indices to execute on. + """ + self.run_args["gpu-affinity"] = ",".join(str(device) for device in devices) diff --git a/tests/test_dragon_client.py b/tests/test_dragon_client.py new file mode 100644 index 0000000000..80257b6107 --- /dev/null +++ b/tests/test_dragon_client.py @@ -0,0 +1,192 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import os +import pathlib +import typing as t +from unittest.mock import MagicMock + +import pytest + +from smartsim._core.launcher.step.dragonStep import DragonBatchStep, DragonStep +from smartsim.settings import DragonRunSettings +from smartsim.settings.slurmSettings import SbatchSettings + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + + +import smartsim._core.entrypoints.dragon_client as dragon_client +from smartsim._core.schemas.dragonRequests import * +from smartsim._core.schemas.dragonResponses import * + + +@pytest.fixture +def dragon_batch_step(test_dir: str) -> "DragonBatchStep": + """Fixture for creating a default batch of steps for a dragon launcher""" + test_path = pathlib.Path(test_dir) + + batch_step_name = "batch_step" + num_nodes = 4 + batch_settings = SbatchSettings(nodes=num_nodes) + batch_step = DragonBatchStep(batch_step_name, test_dir, batch_settings) + + # ensure the status_dir is set + status_dir = (test_path / ".smartsim" / "logs").as_posix() + batch_step.meta["status_dir"] = status_dir + + # create some steps to verify the requests file output changes + rs0 = DragonRunSettings(exe="sleep", exe_args=["1"]) + rs1 = DragonRunSettings(exe="sleep", exe_args=["2"]) + rs2 = DragonRunSettings(exe="sleep", exe_args=["3"]) + rs3 = DragonRunSettings(exe="sleep", exe_args=["4"]) + + names = "test00", "test01", "test02", "test03" + settings = rs0, rs1, rs2, rs3 + + # create steps with: + # no affinity, cpu affinity only, gpu affinity only, cpu and gpu affinity + cpu_affinities = [[], [0, 1, 2], [], [3, 4, 5, 6]] + gpu_affinities = [[], [], [0, 1, 2], [3, 4, 5, 6]] + + # assign some unique affinities to each run setting instance + for index, rs in enumerate(settings): + if gpu_affinities[index]: + rs.set_node_feature("gpu") + rs.set_cpu_affinity(cpu_affinities[index]) + rs.set_gpu_affinity(gpu_affinities[index]) + + steps = list( + DragonStep(name_, test_dir, rs_) for name_, rs_ in zip(names, settings) + ) + + for index, step in enumerate(steps): + # ensure meta is configured... + step.meta["status_dir"] = status_dir + # ... and put all the steps into the batch + batch_step.add_to_batch(steps[index]) + + return batch_step + + +def get_request_path_from_batch_script(launch_cmd: t.List[str]) -> pathlib.Path: + """Helper method for finding the path to a request file from the launch command""" + script_path = pathlib.Path(launch_cmd[-1]) + batch_script = script_path.read_text(encoding="utf-8") + batch_statements = [line for line in batch_script.split("\n") if line] + entrypoint_cmd = batch_statements[-1] + requests_file = pathlib.Path(entrypoint_cmd.split()[-1]) + return requests_file + + +def test_dragon_client_main_no_arg(monkeypatch: pytest.MonkeyPatch): + """Verify the client fails when the path to a submission file is not provided.""" + with pytest.raises(SystemExit): + dragon_client.cleanup = MagicMock() + dragon_client.main([]) + + # arg parser failures occur before resource allocation and should + # not result in resource cleanup being called + assert not dragon_client.cleanup.called + + +def test_dragon_client_main_empty_arg(test_dir: str): + """Verify the client fails when the path to a submission file is empty.""" + + with pytest.raises(ValueError) as ex: + dragon_client.cleanup = MagicMock() + dragon_client.main(["+submit", ""]) + + # verify it's a value error related to submit argument + assert "file not provided" in ex.value.args[0] + + # arg parser failures occur before resource allocation and should + # not result in resource cleanup being called + assert not dragon_client.cleanup.called + + +def test_dragon_client_main_bad_arg(test_dir: str): + """Verify the client returns a failure code when the path to a submission file is + invalid and does not raise an exception""" + path = pathlib.Path(test_dir) / "nonexistent_file.json" + + dragon_client.cleanup = MagicMock() + return_code = dragon_client.main(["+submit", str(path)]) + + # ensure non-zero return code + assert return_code != 0 + + # ensure failures do not block resource cleanup + assert dragon_client.cleanup.called + + +def test_dragon_client_main( + dragon_batch_step: DragonBatchStep, monkeypatch: pytest.MonkeyPatch +): + """Verify the client returns a failure code when the path to a submission file is + invalid and does not raise an exception""" + launch_cmd = dragon_batch_step.get_launch_cmd() + path = get_request_path_from_batch_script(launch_cmd) + num_requests_in_batch = 4 + num_shutdown_requests = 1 + request_count = num_requests_in_batch + num_shutdown_requests + submit_value = str(path) + + mock_connector = MagicMock() # DragonConnector + mock_connector.is_connected = True + mock_connector.send_request.return_value = DragonRunResponse(step_id="mock_step_id") + # mock can_monitor to exit before the infinite loop checking for shutdown + mock_connector.can_monitor = False + + mock_connector_class = MagicMock() + mock_connector_class.return_value = mock_connector + + # with monkeypatch.context() as ctx: + dragon_client.DragonConnector = mock_connector_class + dragon_client.cleanup = MagicMock() + + return_code = dragon_client.main(["+submit", submit_value]) + + # verify each request in the request file was processed + assert mock_connector.send_request.call_count == request_count + + # we know the batch fixture has a step with no affinity args supplied. skip it + for i in range(1, num_requests_in_batch): + sent_args = mock_connector.send_request.call_args_list[i][0] + request_arg = sent_args[0] + + assert isinstance(request_arg, DragonRunRequest) + + policy = request_arg.policy + + # make sure each policy has been read in correctly with valid affinity indices + assert len(policy.cpu_affinity) == len(set(policy.cpu_affinity)) + assert len(policy.gpu_affinity) == len(set(policy.gpu_affinity)) + + # we get a non-zero due to avoiding the infinite loop. consider refactoring + assert return_code == os.EX_IOERR + + # ensure failures do not block resource cleanup + assert dragon_client.cleanup.called diff --git a/tests/test_dragon_launcher.py b/tests/test_dragon_launcher.py index ee0fcb14b7..4fe8bf71b4 100644 --- a/tests/test_dragon_launcher.py +++ b/tests/test_dragon_launcher.py @@ -31,6 +31,7 @@ import sys import time import typing as t +from unittest.mock import MagicMock import pytest import zmq @@ -38,15 +39,74 @@ import smartsim._core.config from smartsim._core._cli.scripts.dragon_install import create_dotenv from smartsim._core.config.config import get_config -from smartsim._core.launcher.dragon.dragonLauncher import DragonConnector +from smartsim._core.launcher.dragon.dragonLauncher import ( + DragonConnector, + DragonLauncher, +) from smartsim._core.launcher.dragon.dragonSockets import ( get_authenticator, get_secure_socket, ) +from smartsim._core.launcher.step.dragonStep import DragonBatchStep, DragonStep from smartsim._core.schemas.dragonRequests import DragonBootstrapRequest -from smartsim._core.schemas.dragonResponses import DragonHandshakeResponse +from smartsim._core.schemas.dragonResponses import ( + DragonHandshakeResponse, + DragonRunResponse, +) from smartsim._core.utils.network import IFConfig, find_free_port from smartsim._core.utils.security import KeyManager +from smartsim.error.errors import LauncherError +from smartsim.settings.dragonRunSettings import DragonRunSettings +from smartsim.settings.slurmSettings import SbatchSettings + + +@pytest.fixture +def dragon_batch_step(test_dir: str) -> DragonBatchStep: + """Fixture for creating a default batch of steps for a dragon launcher""" + test_path = pathlib.Path(test_dir) + + batch_step_name = "batch_step" + num_nodes = 4 + batch_settings = SbatchSettings(nodes=num_nodes) + batch_step = DragonBatchStep(batch_step_name, test_dir, batch_settings) + + # ensure the status_dir is set + status_dir = (test_path / ".smartsim" / "logs").as_posix() + batch_step.meta["status_dir"] = status_dir + + # create some steps to verify the requests file output changes + rs0 = DragonRunSettings(exe="sleep", exe_args=["1"]) + rs1 = DragonRunSettings(exe="sleep", exe_args=["2"]) + rs2 = DragonRunSettings(exe="sleep", exe_args=["3"]) + rs3 = DragonRunSettings(exe="sleep", exe_args=["4"]) + + names = "test00", "test01", "test02", "test03" + settings = rs0, rs1, rs2, rs3 + + # create steps with: + # no affinity, cpu affinity only, gpu affinity only, cpu and gpu affinity + cpu_affinities = [[], [0, 1, 2], [], [3, 4, 5, 6]] + gpu_affinities = [[], [], [0, 1, 2], [3, 4, 5, 6]] + + # assign some unique affinities to each run setting instance + for index, rs in enumerate(settings): + if gpu_affinities[index]: + rs.set_node_feature("gpu") + rs.set_cpu_affinity(cpu_affinities[index]) + rs.set_gpu_affinity(gpu_affinities[index]) + + steps = list( + DragonStep(name_, test_dir, rs_) for name_, rs_ in zip(names, settings) + ) + + for index, step in enumerate(steps): + # ensure meta is configured... + step.meta["status_dir"] = status_dir + # ... and put all the steps into the batch + batch_step.add_to_batch(steps[index]) + + return batch_step + # The tests in this file belong to the group_a group pytestmark = pytest.mark.group_a @@ -521,3 +581,162 @@ def test_merge_env(monkeypatch: pytest.MonkeyPatch, test_dir: str): # any non-dragon keys that didn't exist avoid unnecessary prepending assert merged_env[non_dragon_key] == non_dragon_value + + +def test_run_step_fail(test_dir: str) -> None: + """Verify that the dragon launcher still returns the step id + when the running step fails""" + test_path = pathlib.Path(test_dir) + status_dir = (test_path / ".smartsim" / "logs").as_posix() + + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + step0 = DragonStep("step0", test_dir, rs) + step0.meta["status_dir"] = status_dir + + mock_connector = MagicMock() # DragonConnector() + mock_connector.is_connected = True + mock_connector.send_request = MagicMock( + return_value=DragonRunResponse(step_id=step0.name, error_message="mock fail!") + ) + + launcher = DragonLauncher() + launcher._connector = mock_connector + + result = launcher.run(step0) + + # verify the failed step name is in the result + assert step0.name in result + + +def test_run_step_batch_empty(dragon_batch_step: DragonBatchStep) -> None: + """Verify that the dragon launcher behaves when asked to execute + a batch step that has no sub-steps""" + # remove the steps added in the batch fixture + dragon_batch_step.steps.clear() + + mock_step_id = "MOCK-STEPID" + mock_connector = MagicMock() # DragonConnector() + mock_connector.is_connected = True + mock_connector.send_request = MagicMock( + return_value=DragonRunResponse( + step_id=dragon_batch_step.name, error_message="mock fail!" + ) + ) + + launcher = DragonLauncher() + launcher._connector = mock_connector + launcher.task_manager.start_and_wait = MagicMock(return_value=(0, mock_step_id, "")) + + result = launcher.run(dragon_batch_step) + + # verify a step name is returned + assert result + # verify the batch step name is not in the result (renamed to SLURM-*) + assert dragon_batch_step.name not in result + + send_invocation = mock_connector.send_request + + # verify a batch request is not sent through the dragon connector + send_invocation.assert_not_called() + + +def test_run_step_batch_failure(dragon_batch_step: DragonBatchStep) -> None: + """Verify that the dragon launcher sends returns the step id + when the running step fails""" + mock_connector = MagicMock() # DragonConnector() + mock_connector.is_connected = True + mock_connector.send_request = MagicMock( + return_value=DragonRunResponse( + step_id=dragon_batch_step.name, error_message="mock fail!" + ) + ) + + mock_step_id = "MOCK-STEPID" + error_msg = "DOES_NOT_COMPUTE!" + launcher = DragonLauncher() + launcher._connector = mock_connector + launcher.task_manager.start_and_wait = MagicMock( + return_value=(1, mock_step_id, error_msg) + ) + + # a non-zero return code from the batch script should raise an error + with pytest.raises(LauncherError) as ex: + launcher.run(dragon_batch_step) + + # verify the correct error message is in the exception + assert error_msg in ex.value.args[0] + + +def test_run_step_success(test_dir: str) -> None: + """Verify that the dragon launcher sends the correctly formatted request for a step""" + test_path = pathlib.Path(test_dir) + status_dir = (test_path / ".smartsim" / "logs").as_posix() + + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + step0 = DragonStep("step0", test_dir, rs) + step0.meta["status_dir"] = status_dir + + mock_connector = MagicMock() # DragonConnector() + mock_connector.is_connected = True + mock_connector.send_request = MagicMock( + return_value=DragonRunResponse(step_id=step0.name) + ) + + launcher = DragonLauncher() + launcher._connector = mock_connector + + result = launcher.run(step0) + + # verify the successfully executed step name is in the result + assert step0.name in result + + # verify the DragonRunRequest sent matches all expectations + send_invocation = mock_connector.send_request + send_invocation.assert_called_once() + + args = send_invocation.call_args[0] # call_args == t.Tuple[args, kwargs] + + dragon_run_request = args[0] + req_name = dragon_run_request.name # name sent to dragon env + assert req_name.startswith(step0.name) + + req_policy_cpu_affinity = dragon_run_request.policy.cpu_affinity + assert not req_policy_cpu_affinity # default should be empty list + + req_policy_gpu_affinity = dragon_run_request.policy.gpu_affinity + assert not req_policy_gpu_affinity # default should be empty list + + +def test_run_step_success_batch( + monkeypatch: pytest.MonkeyPatch, dragon_batch_step: DragonBatchStep +) -> None: + """Verify that the dragon launcher sends the correctly formatted request + for a batch step""" + mock_connector = MagicMock() # DragonConnector() + mock_connector.is_connected = True + mock_connector.send_request = MagicMock( + return_value=DragonRunResponse(step_id=dragon_batch_step.name) + ) + + launcher = DragonLauncher() + launcher._connector = mock_connector + launcher.task_manager.start_and_wait = MagicMock(return_value=(0, "success", "")) + + result = launcher.run(dragon_batch_step) + + # verify the successfully executed step name is in the result + assert dragon_batch_step.name not in result + assert result + + send_invocation = mock_connector.send_request + + # verify a batch request is not sent through the dragon connector + send_invocation.assert_not_called() + launcher.task_manager.start_and_wait.assert_called_once() + + args = launcher.task_manager.start_and_wait.call_args[0] + + # verify the batch script is executed + launch_cmd = dragon_batch_step.get_launch_cmd() + for stmt in launch_cmd: + assert stmt in args[0] # args[0] is the cmd list sent to subprocess.Popen diff --git a/tests/test_dragon_run_policy.py b/tests/test_dragon_run_policy.py new file mode 100644 index 0000000000..1d8d069fab --- /dev/null +++ b/tests/test_dragon_run_policy.py @@ -0,0 +1,371 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pathlib + +import pytest + +from smartsim._core.launcher.step.dragonStep import DragonBatchStep, DragonStep +from smartsim.settings.dragonRunSettings import DragonRunSettings +from smartsim.settings.slurmSettings import SbatchSettings + +try: + from dragon.infrastructure.policy import Policy + + import smartsim._core.entrypoints.dragon as drg + from smartsim._core.launcher.dragon.dragonBackend import DragonBackend + + dragon_loaded = True +except: + dragon_loaded = False + +# The tests in this file belong to the group_b group +pytestmark = pytest.mark.group_b + +from smartsim._core.schemas.dragonRequests import * +from smartsim._core.schemas.dragonResponses import * + + +@pytest.fixture +def dragon_batch_step(test_dir: str) -> "DragonBatchStep": + """Fixture for creating a default batch of steps for a dragon launcher""" + test_path = pathlib.Path(test_dir) + + batch_step_name = "batch_step" + num_nodes = 4 + batch_settings = SbatchSettings(nodes=num_nodes) + batch_step = DragonBatchStep(batch_step_name, test_dir, batch_settings) + + # ensure the status_dir is set + status_dir = (test_path / ".smartsim" / "logs").as_posix() + batch_step.meta["status_dir"] = status_dir + + # create some steps to verify the requests file output changes + rs0 = DragonRunSettings(exe="sleep", exe_args=["1"]) + rs1 = DragonRunSettings(exe="sleep", exe_args=["2"]) + rs2 = DragonRunSettings(exe="sleep", exe_args=["3"]) + rs3 = DragonRunSettings(exe="sleep", exe_args=["4"]) + + names = "test00", "test01", "test02", "test03" + settings = rs0, rs1, rs2, rs3 + + # create steps with: + # no affinity, cpu affinity only, gpu affinity only, cpu and gpu affinity + cpu_affinities = [[], [0, 1, 2], [], [3, 4, 5, 6]] + gpu_affinities = [[], [], [0, 1, 2], [3, 4, 5, 6]] + + # assign some unique affinities to each run setting instance + for index, rs in enumerate(settings): + if gpu_affinities[index]: + rs.set_node_feature("gpu") + rs.set_cpu_affinity(cpu_affinities[index]) + rs.set_gpu_affinity(gpu_affinities[index]) + + steps = list( + DragonStep(name_, test_dir, rs_) for name_, rs_ in zip(names, settings) + ) + + for index, step in enumerate(steps): + # ensure meta is configured... + step.meta["status_dir"] = status_dir + # ... and put all the steps into the batch + batch_step.add_to_batch(steps[index]) + + return batch_step + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +@pytest.mark.parametrize( + "dragon_request", + [ + pytest.param(DragonHandshakeRequest(), id="DragonHandshakeRequest"), + pytest.param(DragonShutdownRequest(), id="DragonShutdownRequest"), + pytest.param( + DragonBootstrapRequest(address="localhost"), id="DragonBootstrapRequest" + ), + ], +) +def test_create_run_policy_non_run_request(dragon_request: DragonRequest) -> None: + """Verify that a default policy is returned when a request is + not attempting to start a new proccess (e.g. a DragonRunRequest)""" + policy = DragonBackend.create_run_policy(dragon_request, "localhost") + + assert policy is not None, "Default policy was not returned" + assert ( + policy.device == Policy.Device.DEFAULT + ), "Default device was not Device.DEFAULT" + assert policy.cpu_affinity == [], "Default cpu affinity was not empty" + assert policy.gpu_affinity == [], "Default gpu affinity was not empty" + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_create_run_policy_run_request_no_run_policy() -> None: + """Verify that a policy specifying no policy is returned with all default + values (no device, empty cpu & gpu affinity)""" + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + # policy= # <--- skipping this + ) + + policy = DragonBackend.create_run_policy(run_req, "localhost") + + assert policy.device == Policy.Device.DEFAULT + assert set(policy.cpu_affinity) == set() + assert policy.gpu_affinity == [] + assert policy.affinity == Policy.Affinity.DEFAULT + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_create_run_policy_run_request_default_run_policy() -> None: + """Verify that a policy specifying no affinity is returned with + default value for device and empty affinity lists""" + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(), # <--- passing default values + ) + + policy = DragonBackend.create_run_policy(run_req, "localhost") + + assert set(policy.cpu_affinity) == set() + assert set(policy.gpu_affinity) == set() + assert policy.affinity == Policy.Affinity.DEFAULT + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_create_run_policy_run_request_cpu_affinity_no_device() -> None: + """Verify that a input policy specifying a CPU affinity but lacking the device field + produces a Dragon Policy with the CPU device specified""" + affinity = set([0, 2, 4]) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(cpu_affinity=list(affinity)), # <-- no device spec + ) + + policy = DragonBackend.create_run_policy(run_req, "localhost") + + assert set(policy.cpu_affinity) == affinity + assert policy.gpu_affinity == [] + assert policy.affinity == Policy.Affinity.SPECIFIC + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_create_run_policy_run_request_cpu_affinity() -> None: + """Verify that a policy specifying CPU affinity is returned as expected""" + affinity = set([0, 2, 4]) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(cpu_affinity=list(affinity)), + ) + + policy = DragonBackend.create_run_policy(run_req, "localhost") + + assert set(policy.cpu_affinity) == affinity + assert policy.gpu_affinity == [] + assert policy.affinity == Policy.Affinity.SPECIFIC + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_create_run_policy_run_request_gpu_affinity() -> None: + """Verify that a policy specifying GPU affinity is returned as expected""" + affinity = set([0, 2, 4]) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(device="gpu", gpu_affinity=list(affinity)), + ) + + policy = DragonBackend.create_run_policy(run_req, "localhost") + + assert policy.cpu_affinity == [] + assert set(policy.gpu_affinity) == set(affinity) + assert policy.affinity == Policy.Affinity.SPECIFIC + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_dragon_run_policy_from_run_args() -> None: + """Verify that a DragonRunPolicy is created from a dictionary of run arguments""" + run_args = { + "gpu-affinity": "0,1,2", + "cpu-affinity": "3,4,5,6", + } + + policy = DragonRunPolicy.from_run_args(run_args) + + assert policy.cpu_affinity == [3, 4, 5, 6] + assert policy.gpu_affinity == [0, 1, 2] + + +def test_dragon_run_policy_from_run_args_empty() -> None: + """Verify that a DragonRunPolicy is created from an empty + dictionary of run arguments""" + run_args = {} + + policy = DragonRunPolicy.from_run_args(run_args) + + assert policy.cpu_affinity == [] + assert policy.gpu_affinity == [] + + +def test_dragon_run_policy_from_run_args_cpu_affinity() -> None: + """Verify that a DragonRunPolicy is created from a dictionary + of run arguments containing a CPU affinity""" + run_args = { + "cpu-affinity": "3,4,5,6", + } + + policy = DragonRunPolicy.from_run_args(run_args) + + assert policy.cpu_affinity == [3, 4, 5, 6] + assert policy.gpu_affinity == [] + + +def test_dragon_run_policy_from_run_args_gpu_affinity() -> None: + """Verify that a DragonRunPolicy is created from a dictionary + of run arguments containing a GPU affinity""" + run_args = { + "gpu-affinity": "0, 1, 2", + } + + policy = DragonRunPolicy.from_run_args(run_args) + + assert policy.cpu_affinity == [] + assert policy.gpu_affinity == [0, 1, 2] + + +def test_dragon_run_policy_from_run_args_invalid_gpu_affinity() -> None: + """Verify that a DragonRunPolicy is NOT created from a dictionary + of run arguments with an invalid GPU affinity""" + run_args = { + "gpu-affinity": "0,-1,2", + } + + with pytest.raises(SmartSimError) as ex: + DragonRunPolicy.from_run_args(run_args) + + assert "DragonRunPolicy" in ex.value.args[0] + + +def test_dragon_run_policy_from_run_args_invalid_cpu_affinity() -> None: + """Verify that a DragonRunPolicy is NOT created from a dictionary + of run arguments with an invalid CPU affinity""" + run_args = { + "cpu-affinity": "3,4,5,-6", + } + + with pytest.raises(SmartSimError) as ex: + DragonRunPolicy.from_run_args(run_args) + + assert "DragonRunPolicy" in ex.value.args[0] + + +def test_dragon_run_policy_from_run_args_ignore_empties_gpu() -> None: + """Verify that a DragonRunPolicy is created from a dictionary + of run arguments and ignores empty values in the serialized gpu list""" + run_args = { + "gpu-affinity": "0,,2", + } + + policy = DragonRunPolicy.from_run_args(run_args) + + assert policy.cpu_affinity == [] + assert policy.gpu_affinity == [0, 2] + + +def test_dragon_run_policy_from_run_args_ignore_empties_cpu() -> None: + """Verify that a DragonRunPolicy is created from a dictionary + of run arguments and ignores empty values in the serialized cpu list""" + run_args = { + "cpu-affinity": "3,4,,6,", + } + + policy = DragonRunPolicy.from_run_args(run_args) + + assert policy.cpu_affinity == [3, 4, 6] + assert policy.gpu_affinity == [] + + +def test_dragon_run_policy_from_run_args_null_gpu_affinity() -> None: + """Verify that a DragonRunPolicy is created if a null value is encountered + in the gpu-affinity list""" + run_args = { + "gpu-affinity": None, + "cpu-affinity": "3,4,5,6", + } + + policy = DragonRunPolicy.from_run_args(run_args) + + assert policy.cpu_affinity == [3, 4, 5, 6] + assert policy.gpu_affinity == [] + + +def test_dragon_run_policy_from_run_args_null_cpu_affinity() -> None: + """Verify that a DragonRunPolicy is created if a null value is encountered + in the cpu-affinity list""" + run_args = {"gpu-affinity": "0,1,2", "cpu-affinity": None} + + policy = DragonRunPolicy.from_run_args(run_args) + + assert policy.cpu_affinity == [] + assert policy.gpu_affinity == [0, 1, 2] diff --git a/tests/test_dragon_backend.py b/tests/test_dragon_run_request.py similarity index 64% rename from tests/test_dragon_backend.py rename to tests/test_dragon_run_request.py index f284f38d99..94c17c222a 100644 --- a/tests/test_dragon_backend.py +++ b/tests/test_dragon_run_request.py @@ -31,19 +31,17 @@ from unittest.mock import MagicMock import pytest +from pydantic import ValidationError # The tests in this file belong to the group_b group -pytestmark = pytest.mark.group_a +pytestmark = pytest.mark.group_b try: import dragon -except ImportError: - pass -else: - pytest.skip( - reason="Using dragon as launcher, not running Dragon unit tests", - allow_module_level=True, - ) + + dragon_loaded = True +except: + dragon_loaded = False from smartsim._core.config import CONFIG from smartsim._core.schemas.dragonRequests import * @@ -59,10 +57,36 @@ class NodeMock(MagicMock): + def __init__( + self, name: t.Optional[str] = None, num_gpus: int = 2, num_cpus: int = 8 + ) -> None: + super().__init__() + self._mock_id = name + NodeMock._num_gpus = num_gpus + NodeMock._num_cpus = num_cpus + @property def hostname(self) -> str: + if self._mock_id: + return self._mock_id return create_short_id_str() + @property + def num_cpus(self) -> str: + return NodeMock._num_cpus + + @property + def num_gpus(self) -> str: + return NodeMock._num_gpus + + def _set_id(self, value: str) -> None: + self._mock_id = value + + def gpus(self, parent: t.Any = None) -> t.List[str]: + if self._num_gpus: + return [f"{self.hostname}-gpu{i}" for i in range(NodeMock._num_gpus)] + return [] + class GroupStateMock(MagicMock): def Running(self) -> MagicMock: @@ -78,13 +102,19 @@ class ProcessGroupMock(MagicMock): puids = [121, 122] -def get_mock_backend(monkeypatch: pytest.MonkeyPatch) -> "DragonBackend": +def node_mock() -> NodeMock: + return NodeMock() + + +def get_mock_backend( + monkeypatch: pytest.MonkeyPatch, num_gpus: int = 2 +) -> "DragonBackend": process_mock = MagicMock(returncode=0) process_group_mock = MagicMock(**{"Process.return_value": ProcessGroupMock()}) process_module_mock = MagicMock() process_module_mock.Process = process_mock - node_mock = NodeMock() + node_mock = NodeMock(num_gpus=num_gpus) system_mock = MagicMock(nodes=["node1", "node2", "node3"]) monkeypatch.setitem( sys.modules, @@ -199,6 +229,7 @@ def set_mock_group_infos( return group_infos +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_handshake_request(monkeypatch: pytest.MonkeyPatch) -> None: dragon_backend = get_mock_backend(monkeypatch) @@ -209,6 +240,7 @@ def test_handshake_request(monkeypatch: pytest.MonkeyPatch) -> None: assert handshake_resp.dragon_pid == 99999 +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None: dragon_backend = get_mock_backend(monkeypatch) run_req = DragonRunRequest( @@ -259,6 +291,7 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None: assert not dragon_backend._running_steps +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_deny_run_request(monkeypatch: pytest.MonkeyPatch) -> None: dragon_backend = get_mock_backend(monkeypatch) @@ -284,6 +317,78 @@ def test_deny_run_request(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend.group_infos[step_id].status == SmartSimStatus.STATUS_FAILED +def test_run_request_with_empty_policy(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify that a policy is applied to a run request""" + dragon_backend = get_mock_backend(monkeypatch) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=None, + ) + assert run_req.policy is None + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_run_request_with_policy(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify that a policy is applied to a run request""" + dragon_backend = get_mock_backend(monkeypatch) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(cpu_affinity=[0, 1]), + ) + + run_resp = dragon_backend.process_request(run_req) + assert isinstance(run_resp, DragonRunResponse) + + step_id = run_resp.step_id + assert dragon_backend._queued_steps[step_id] == run_req + + mock_process_group = MagicMock(puids=[123, 124]) + + dragon_backend._group_infos[step_id].process_group = mock_process_group + dragon_backend._group_infos[step_id].puids = [123, 124] + dragon_backend._start_steps() + + assert dragon_backend._running_steps == [step_id] + assert len(dragon_backend._queued_steps) == 0 + assert len(dragon_backend._free_hosts) == 1 + assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id + assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id + + monkeypatch.setattr( + dragon_backend._group_infos[step_id].process_group, "status", "Running" + ) + + dragon_backend._update() + + assert dragon_backend._running_steps == [step_id] + assert len(dragon_backend._queued_steps) == 0 + assert len(dragon_backend._free_hosts) == 1 + assert dragon_backend._allocated_hosts[dragon_backend.hosts[0]] == step_id + assert dragon_backend._allocated_hosts[dragon_backend.hosts[1]] == step_id + + dragon_backend._group_infos[step_id].status = SmartSimStatus.STATUS_CANCELLED + + dragon_backend._update() + assert not dragon_backend._running_steps + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_udpate_status_request(monkeypatch: pytest.MonkeyPatch) -> None: dragon_backend = get_mock_backend(monkeypatch) @@ -300,6 +405,7 @@ def test_udpate_status_request(monkeypatch: pytest.MonkeyPatch) -> None: } +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None: dragon_backend = get_mock_backend(monkeypatch) group_infos = set_mock_group_infos(monkeypatch, dragon_backend) @@ -331,6 +437,7 @@ def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None: assert len(dragon_backend._free_hosts) == 3 +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @pytest.mark.parametrize( "immediate, kill_jobs, frontend_shutdown", [ @@ -389,6 +496,7 @@ def test_shutdown_request( assert dragon_backend._has_cooled_down == kill_jobs +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @pytest.mark.parametrize("telemetry_flag", ["0", "1"]) def test_cooldown_is_set(monkeypatch: pytest.MonkeyPatch, telemetry_flag: str) -> None: monkeypatch.setenv("SMARTSIM_FLAG_TELEMETRY", telemetry_flag) @@ -404,6 +512,7 @@ def test_cooldown_is_set(monkeypatch: pytest.MonkeyPatch, telemetry_flag: str) - assert dragon_backend.cooldown_period == expected_cooldown +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_heartbeat_and_time(monkeypatch: pytest.MonkeyPatch) -> None: dragon_backend = get_mock_backend(monkeypatch) first_heartbeat = dragon_backend.last_heartbeat @@ -412,6 +521,7 @@ def test_heartbeat_and_time(monkeypatch: pytest.MonkeyPatch) -> None: assert dragon_backend.last_heartbeat > first_heartbeat +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") @pytest.mark.parametrize("num_nodes", [1, 3, 100]) def test_can_honor(monkeypatch: pytest.MonkeyPatch, num_nodes: int) -> None: dragon_backend = get_mock_backend(monkeypatch) @@ -432,6 +542,119 @@ def test_can_honor(monkeypatch: pytest.MonkeyPatch, num_nodes: int) -> None: ) +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +@pytest.mark.parametrize("affinity", [[0], [0, 1], list(range(8))]) +def test_can_honor_cpu_affinity( + monkeypatch: pytest.MonkeyPatch, affinity: t.List[int] +) -> None: + """Verify that valid CPU affinities are accepted""" + dragon_backend = get_mock_backend(monkeypatch) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(cpu_affinity=affinity), + ) + + assert dragon_backend._can_honor(run_req)[0] + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_can_honor_cpu_affinity_out_of_range(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify that invalid CPU affinities are NOT accepted + NOTE: negative values are captured by the Pydantic schema""" + dragon_backend = get_mock_backend(monkeypatch) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(cpu_affinity=list(range(9))), + ) + + assert not dragon_backend._can_honor(run_req)[0] + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +@pytest.mark.parametrize("affinity", [[0], [0, 1]]) +def test_can_honor_gpu_affinity( + monkeypatch: pytest.MonkeyPatch, affinity: t.List[int] +) -> None: + """Verify that valid GPU affinities are accepted""" + dragon_backend = get_mock_backend(monkeypatch) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(gpu_affinity=affinity), + ) + + assert dragon_backend._can_honor(run_req)[0] + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_can_honor_gpu_affinity_out_of_range(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify that invalid GPU affinities are NOT accepted + NOTE: negative values are captured by the Pydantic schema""" + dragon_backend = get_mock_backend(monkeypatch) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(gpu_affinity=list(range(3))), + ) + + assert not dragon_backend._can_honor(run_req)[0] + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") +def test_can_honor_gpu_device_not_available(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify that a request for a GPU if none exists is not accepted""" + + # create a mock node class that always reports no GPUs available + dragon_backend = get_mock_backend(monkeypatch, num_gpus=0) + + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + # specify GPU device w/no affinity + policy=DragonRunPolicy(gpu_affinity=[0]), + ) + + assert not dragon_backend._can_honor(run_req)[0] + + +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_get_id(monkeypatch: pytest.MonkeyPatch) -> None: dragon_backend = get_mock_backend(monkeypatch) step_id = next(dragon_backend._step_ids) @@ -440,6 +663,7 @@ def test_get_id(monkeypatch: pytest.MonkeyPatch) -> None: assert step_id != next(dragon_backend._step_ids) +@pytest.mark.skipif(not dragon_loaded, reason="Test is only for Dragon WLM systems") def test_view(monkeypatch: pytest.MonkeyPatch) -> None: dragon_backend = get_mock_backend(monkeypatch) set_mock_group_infos(monkeypatch, dragon_backend) @@ -447,17 +671,21 @@ def test_view(monkeypatch: pytest.MonkeyPatch) -> None: expected_message = textwrap.dedent(f"""\ Dragon server backend update - | Host | Status | - |---------|----------| + | Host | Status | + |--------|----------| | {hosts[0]} | Busy | | {hosts[1]} | Free | | {hosts[2]} | Free | | Step | Status | Hosts | Return codes | Num procs | - |----------|--------------|-----------------|----------------|-------------| + |----------|--------------|-------------|----------------|-------------| | abc123-1 | Running | {hosts[0]} | | 1 | | del999-2 | Cancelled | {hosts[1]} | -9 | 1 | | c101vz-3 | Completed | {hosts[1]},{hosts[2]} | 0 | 2 | | 0ghjk1-4 | Failed | {hosts[2]} | -1 | 1 | | ljace0-5 | NeverStarted | | | 0 |""") - assert dragon_backend.status_message == expected_message + # get rid of white space to make the comparison easier + actual_msg = dragon_backend.status_message.replace(" ", "") + expected_message = expected_message.replace(" ", "") + + assert actual_msg == expected_message diff --git a/tests/test_dragon_run_request_nowlm.py b/tests/test_dragon_run_request_nowlm.py new file mode 100644 index 0000000000..afd25aa9d7 --- /dev/null +++ b/tests/test_dragon_run_request_nowlm.py @@ -0,0 +1,105 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest +from pydantic import ValidationError + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + +from smartsim._core.schemas.dragonRequests import * +from smartsim._core.schemas.dragonResponses import * + + +def test_run_request_with_null_policy(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify that an empty policy does not cause an error""" + # dragon_backend = get_mock_backend(monkeypatch) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=None, + ) + assert run_req.policy is None + + +def test_run_request_with_empty_policy(monkeypatch: pytest.MonkeyPatch) -> None: + """Verify that a non-empty policy is set correctly""" + # dragon_backend = get_mock_backend(monkeypatch) + run_req = DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy(), + ) + assert run_req.policy is not None + assert not run_req.policy.cpu_affinity + assert not run_req.policy.gpu_affinity + + +@pytest.mark.parametrize( + "device,cpu_affinity,gpu_affinity", + [ + pytest.param("cpu", [-1], [], id="cpu_affinity"), + pytest.param("gpu", [], [-1], id="gpu_affinity"), + ], +) +def test_run_request_with_negative_affinity( + device: str, + cpu_affinity: t.List[int], + gpu_affinity: t.List[int], +) -> None: + """Verify that invalid affinity values fail validation""" + with pytest.raises(ValidationError) as ex: + DragonRunRequest( + exe="sleep", + exe_args=["5"], + path="/a/fake/path", + nodes=2, + tasks=1, + tasks_per_node=1, + env={}, + current_env={}, + pmi_enabled=False, + policy=DragonRunPolicy( + cpu_affinity=cpu_affinity, gpu_affinity=gpu_affinity + ), + ) + + assert f"{device}_affinity" in str(ex.value.args[0]) + assert "NumberNotGeError" in str(ex.value.args[0]) diff --git a/tests/test_dragon_runsettings.py b/tests/test_dragon_runsettings.py new file mode 100644 index 0000000000..34e8510e82 --- /dev/null +++ b/tests/test_dragon_runsettings.py @@ -0,0 +1,98 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest + +from smartsim.settings import DragonRunSettings + +# The tests in this file belong to the group_b group +pytestmark = pytest.mark.group_a + + +def test_dragon_runsettings_nodes(): + """Verify that node count is set correctly""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + exp_value = 3 + rs.set_nodes(exp_value) + assert rs.run_args["nodes"] == exp_value + + exp_value = 9 + rs.set_nodes(exp_value) + assert rs.run_args["nodes"] == exp_value + + +def test_dragon_runsettings_tasks_per_node(): + """Verify that tasks per node is set correctly""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + exp_value = 3 + rs.set_tasks_per_node(exp_value) + assert rs.run_args["tasks-per-node"] == exp_value + + exp_value = 7 + rs.set_tasks_per_node(exp_value) + assert rs.run_args["tasks-per-node"] == exp_value + + +def test_dragon_runsettings_cpu_affinity(): + """Verify that the CPU affinity is set correctly""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + exp_value = [0, 1, 2, 3] + rs.set_cpu_affinity([0, 1, 2, 3]) + assert rs.run_args["cpu-affinity"] == ",".join(str(val) for val in exp_value) + + # ensure the value is not changed when we extend the list + exp_value.extend([4, 5, 6]) + assert rs.run_args["cpu-affinity"] != ",".join(str(val) for val in exp_value) + + rs.set_cpu_affinity(exp_value) + assert rs.run_args["cpu-affinity"] == ",".join(str(val) for val in exp_value) + + # ensure the value is not changed when we extend the list + rs.run_args["cpu-affinity"] = "7,8,9" + assert rs.run_args["cpu-affinity"] != ",".join(str(val) for val in exp_value) + + +def test_dragon_runsettings_gpu_affinity(): + """Verify that the GPU affinity is set correctly""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + exp_value = [0, 1, 2, 3] + rs.set_gpu_affinity([0, 1, 2, 3]) + assert rs.run_args["gpu-affinity"] == ",".join(str(val) for val in exp_value) + + # ensure the value is not changed when we extend the list + exp_value.extend([4, 5, 6]) + assert rs.run_args["gpu-affinity"] != ",".join(str(val) for val in exp_value) + + rs.set_gpu_affinity(exp_value) + assert rs.run_args["gpu-affinity"] == ",".join(str(val) for val in exp_value) + + # ensure the value is not changed when we extend the list + rs.run_args["gpu-affinity"] = "7,8,9" + assert rs.run_args["gpu-affinity"] != ",".join(str(val) for val in exp_value) diff --git a/tests/test_dragon_step.py b/tests/test_dragon_step.py new file mode 100644 index 0000000000..19f408e0bd --- /dev/null +++ b/tests/test_dragon_step.py @@ -0,0 +1,394 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import json +import pathlib +import shutil +import sys +import typing as t + +import pytest + +from smartsim._core.launcher.step.dragonStep import DragonBatchStep, DragonStep +from smartsim.settings import DragonRunSettings +from smartsim.settings.pbsSettings import QsubBatchSettings +from smartsim.settings.slurmSettings import SbatchSettings + +# The tests in this file belong to the group_a group +pytestmark = pytest.mark.group_a + + +from smartsim._core.schemas.dragonRequests import * +from smartsim._core.schemas.dragonResponses import * + + +@pytest.fixture +def dragon_batch_step(test_dir: str) -> DragonBatchStep: + """Fixture for creating a default batch of steps for a dragon launcher""" + test_path = pathlib.Path(test_dir) + + batch_step_name = "batch_step" + num_nodes = 4 + batch_settings = SbatchSettings(nodes=num_nodes) + batch_step = DragonBatchStep(batch_step_name, test_dir, batch_settings) + + # ensure the status_dir is set + status_dir = (test_path / ".smartsim" / "logs").as_posix() + batch_step.meta["status_dir"] = status_dir + + # create some steps to verify the requests file output changes + rs0 = DragonRunSettings(exe="sleep", exe_args=["1"]) + rs1 = DragonRunSettings(exe="sleep", exe_args=["2"]) + rs2 = DragonRunSettings(exe="sleep", exe_args=["3"]) + rs3 = DragonRunSettings(exe="sleep", exe_args=["4"]) + + names = "test00", "test01", "test02", "test03" + settings = rs0, rs1, rs2, rs3 + + # create steps with: + # no affinity, cpu affinity only, gpu affinity only, cpu and gpu affinity + cpu_affinities = [[], [0, 1, 2], [], [3, 4, 5, 6]] + gpu_affinities = [[], [], [0, 1, 2], [3, 4, 5, 6]] + + # assign some unique affinities to each run setting instance + for index, rs in enumerate(settings): + if gpu_affinities[index]: + rs.set_node_feature("gpu") + rs.set_cpu_affinity(cpu_affinities[index]) + rs.set_gpu_affinity(gpu_affinities[index]) + + steps = list( + DragonStep(name_, test_dir, rs_) for name_, rs_ in zip(names, settings) + ) + + for index, step in enumerate(steps): + # ensure meta is configured... + step.meta["status_dir"] = status_dir + # ... and put all the steps into the batch + batch_step.add_to_batch(steps[index]) + + return batch_step + + +def get_request_path_from_batch_script(launch_cmd: t.List[str]) -> pathlib.Path: + """Helper method for finding the path to a request file from the launch command""" + script_path = pathlib.Path(launch_cmd[-1]) + batch_script = script_path.read_text(encoding="utf-8") + batch_statements = [line for line in batch_script.split("\n") if line] + entrypoint_cmd = batch_statements[-1] + requests_file = pathlib.Path(entrypoint_cmd.split()[-1]) + return requests_file + + +def test_dragon_step_creation(test_dir: str) -> None: + """Verify that the step is created with the values provided""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + original_name = "test" + step = DragonStep(original_name, test_dir, rs) + + # confirm the name has been made unique to avoid conflicts + assert step.name != original_name + assert step.entity_name == original_name + assert step.cwd == test_dir + assert step.step_settings is not None + + +def test_dragon_step_name_uniqueness(test_dir: str) -> None: + """Verify that step name is unique and independent of step content""" + + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + original_name = "test" + + num_steps = 100 + steps = [DragonStep(original_name, test_dir, rs) for _ in range(num_steps)] + + # confirm the name has been made unique in each step + step_names = {step.name for step in steps} + assert len(step_names) == num_steps + + +def test_dragon_step_launch_cmd(test_dir: str) -> None: + """Verify the expected launch cmd is generated w/minimal settings""" + exp_exe = "sleep" + exp_exe_args = "1" + rs = DragonRunSettings(exe=exp_exe, exe_args=[exp_exe_args]) + + original_name = "test" + step = DragonStep(original_name, test_dir, rs) + + launch_cmd = step.get_launch_cmd() + assert len(launch_cmd) == 2 + + # we'll verify the exe_args and exe name are handled correctly + exe, args = launch_cmd + assert exp_exe in exe + assert exp_exe_args in args + + # also, verify that a string exe_args param instead of list is handled correctly + exp_exe_args = "1 2 3" + rs = DragonRunSettings(exe=exp_exe, exe_args=exp_exe_args) + step = DragonStep(original_name, test_dir, rs) + launch_cmd = step.get_launch_cmd() + assert len(launch_cmd) == 4 # "/foo/bar/sleep 1 2 3" + + +def test_dragon_step_launch_cmd_multi_arg(test_dir: str) -> None: + """Verify the expected launch cmd is generated when multiple arguments + are passed to run settings""" + exp_exe = "sleep" + arg0, arg1, arg2 = "1", "2", "3" + rs = DragonRunSettings(exe=exp_exe, exe_args=[arg0, arg1, arg2]) + + original_name = "test" + + step = DragonStep(original_name, test_dir, rs) + + launch_cmd = step.get_launch_cmd() + assert len(launch_cmd) == 4 + + exe, *args = launch_cmd + assert exp_exe in exe + assert arg0 in args + assert arg1 in args + assert arg2 in args + + +def test_dragon_step_launch_cmd_no_bash( + test_dir: str, monkeypatch: pytest.MonkeyPatch +) -> None: + """Verify that requirement for bash shell is checked""" + exp_exe = "sleep" + arg0, arg1, arg2 = "1", "2", "3" + rs = DragonRunSettings(exe=exp_exe, exe_args=[arg0, arg1, arg2]) + rs.colocated_db_settings = {"foo": "bar"} # triggers bash lookup + + original_name = "test" + step = DragonStep(original_name, test_dir, rs) + + with pytest.raises(RuntimeError) as ex, monkeypatch.context() as ctx: + ctx.setattr(shutil, "which", lambda _: None) + step.get_launch_cmd() + + # verify the exception thrown is the one we're looking for + assert "Could not find" in ex.value.args[0] + + +def test_dragon_step_colocated_db() -> None: + # todo: implement a test for the branch where bash is found and + # run_settings.colocated_db_settings is set + ... + + +def test_dragon_step_container() -> None: + # todo: implement a test for the branch where run_settings.container + # is an instance of class `Singularity` + ... + + +def test_dragon_step_run_settings_accessor(test_dir: str) -> None: + """Verify the run settings passed to the step are copied correctly and + are not inadvertently modified outside the step""" + exp_exe = "sleep" + arg0, arg1, arg2 = "1", "2", "3" + rs = DragonRunSettings(exe=exp_exe, exe_args=[arg0, arg1, arg2]) + + original_name = "test" + step = DragonStep(original_name, test_dir, rs) + rs_output = step.run_settings + + assert rs.exe == rs_output.exe + assert rs.exe_args == rs_output.exe_args + + # ensure we have a deep copy + rs.exe = "foo" + assert id(step.run_settings) != id(rs) + assert step.run_settings.exe != rs.exe + + +def test_dragon_batch_step_creation(test_dir: str) -> None: + """Verify that the batch step is created with the values provided""" + batch_step_name = "batch_step" + num_nodes = 4 + batch_settings = SbatchSettings(nodes=num_nodes) + batch_step = DragonBatchStep(batch_step_name, test_dir, batch_settings) + + # confirm the name has been made unique to avoid conflicts + assert batch_step.name != batch_step_name + assert batch_step.entity_name == batch_step_name + assert batch_step.cwd == test_dir + assert batch_step.batch_settings is not None + assert batch_step.managed + + +def test_dragon_batch_step_add_to_batch(test_dir: str) -> None: + """Verify that steps are added to the batch correctly""" + rs = DragonRunSettings(exe="sleep", exe_args=["1"]) + + name0, name1, name2 = "test00", "test01", "test02" + step0 = DragonStep(name0, test_dir, rs) + step1 = DragonStep(name1, test_dir, rs) + step2 = DragonStep(name2, test_dir, rs) + + batch_step_name = "batch_step" + num_nodes = 4 + batch_settings = SbatchSettings(nodes=num_nodes) + batch_step = DragonBatchStep(batch_step_name, test_dir, batch_settings) + + assert len(batch_step.steps) == 0 + + batch_step.add_to_batch(step0) + assert len(batch_step.steps) == 1 + assert name0 in ",".join({step.name for step in batch_step.steps}) + + batch_step.add_to_batch(step1) + assert len(batch_step.steps) == 2 + assert name1 in ",".join({step.name for step in batch_step.steps}) + + batch_step.add_to_batch(step2) + assert len(batch_step.steps) == 3 + assert name2 in ",".join({step.name for step in batch_step.steps}) + + +def test_dragon_batch_step_get_launch_command_meta_fail(test_dir: str) -> None: + """Verify that the batch launch command cannot be generated without + having the status directory set in the step metadata""" + batch_step_name = "batch_step" + num_nodes = 4 + batch_settings = SbatchSettings(nodes=num_nodes) + batch_step = DragonBatchStep(batch_step_name, test_dir, batch_settings) + + with pytest.raises(KeyError) as ex: + batch_step.get_launch_cmd() + + +@pytest.mark.parametrize( + "batch_settings_class,batch_exe,batch_header,node_spec_tpl", + [ + pytest.param( + SbatchSettings, "sbatch", "#SBATCH", "#SBATCH --nodes={0}", id="sbatch" + ), + pytest.param(QsubBatchSettings, "qsub", "#PBS", "#PBS -l nodes={0}", id="qsub"), + ], +) +def test_dragon_batch_step_get_launch_command( + test_dir: str, + batch_settings_class: t.Type, + batch_exe: str, + batch_header: str, + node_spec_tpl: str, +) -> None: + """Verify that the batch launch command is properly generated and + the expected side effects are present (writing script file to disk)""" + test_path = pathlib.Path(test_dir) + + batch_step_name = "batch_step" + num_nodes = 4 + batch_settings = batch_settings_class(nodes=num_nodes) + batch_step = DragonBatchStep(batch_step_name, test_dir, batch_settings) + + # ensure the status_dir is set + status_dir = (test_path / ".smartsim" / "logs").as_posix() + batch_step.meta["status_dir"] = status_dir + + launch_cmd = batch_step.get_launch_cmd() + assert launch_cmd + + full_cmd = " ".join(launch_cmd) + assert batch_exe in full_cmd # verify launcher running the batch + assert test_dir in full_cmd # verify outputs are sent to expected directory + assert "batch_step.sh" in full_cmd # verify batch script name is in the command + + # ...verify that the script file is written when getting the launch command + script_path = pathlib.Path(launch_cmd[-1]) + assert script_path.exists() + assert len(script_path.read_bytes()) > 0 + + batch_script = script_path.read_text(encoding="utf-8") + + # ...verify the script file has the expected batch script header content + assert batch_header in batch_script + assert node_spec_tpl.format(num_nodes) in batch_script # verify node count is set + + # ...verify the script has the expected entrypoint command + batch_statements = [line for line in batch_script.split("\n") if line] + python_path = sys.executable + + entrypoint_cmd = batch_statements[-1] + assert python_path in entrypoint_cmd + assert "smartsim._core.entrypoints.dragon_client +submit" in entrypoint_cmd + + +def test_dragon_batch_step_write_request_file_no_steps(test_dir: str) -> None: + """Verify that the batch launch command writes an appropriate request file + if no steps are attached""" + test_path = pathlib.Path(test_dir) + + batch_step_name = "batch_step" + num_nodes = 4 + batch_settings = SbatchSettings(nodes=num_nodes) + batch_step = DragonBatchStep(batch_step_name, test_dir, batch_settings) + + # ensure the status_dir is set + status_dir = (test_path / ".smartsim" / "logs").as_posix() + batch_step.meta["status_dir"] = status_dir + + launch_cmd = batch_step.get_launch_cmd() + requests_file = get_request_path_from_batch_script(launch_cmd) + + # no steps have been added yet, so the requests file should be a serialized, empty list + assert requests_file.read_text(encoding="utf-8") == "[]" + + +def test_dragon_batch_step_write_request_file( + dragon_batch_step: DragonBatchStep, +) -> None: + """Verify that the batch launch command writes an appropriate request file + for the set of attached steps""" + # create steps with: + # no affinity, cpu affinity only, gpu affinity only, cpu and gpu affinity + cpu_affinities = [[], [0, 1, 2], [], [3, 4, 5, 6]] + gpu_affinities = [[], [], [0, 1, 2], [3, 4, 5, 6]] + + launch_cmd = dragon_batch_step.get_launch_cmd() + requests_file = get_request_path_from_batch_script(launch_cmd) + + requests_text = requests_file.read_text(encoding="utf-8") + requests_json: t.List[str] = json.loads(requests_text) + + # verify that there is an item in file for each step added to the batch + assert len(requests_json) == len(dragon_batch_step.steps) + + for index, req in enumerate(requests_json): + req_type, req_data = req.split("|", 1) + # the only steps added are to execute apps, requests should be of type "run" + assert req_type == "run" + + run_request = DragonRunRequest(**json.loads(req_data)) + assert run_request + assert run_request.policy.cpu_affinity == cpu_affinities[index] + assert run_request.policy.gpu_affinity == gpu_affinities[index]