diff --git a/.pylintrc b/.pylintrc index aa378d0399..34580db3b6 100644 --- a/.pylintrc +++ b/.pylintrc @@ -167,7 +167,7 @@ max-module-lines=1000 # Allow the body of a class to be on the same line as the declaration if body # contains single statement. -single-line-class-stmt=no +single-line-class-stmt=yes # Allow the body of an if to be on the same line as the test if there is no # else. diff --git a/smartsim/_core/generation/generator.py b/smartsim/_core/generation/generator.py index e17f43b854..b1d2414160 100644 --- a/smartsim/_core/generation/generator.py +++ b/smartsim/_core/generation/generator.py @@ -191,8 +191,6 @@ def _gen_entity_list_dir(self, entity_lists: t.List[Ensemble]) -> None: mkdir(elist_dir) elist.path = elist_dir - self._gen_entity_dirs(list(elist.applications), entity_list=elist) - def _gen_entity_dirs( self, entities: t.List[Application], diff --git a/smartsim/_core/utils/helpers.py b/smartsim/_core/utils/helpers.py index 70f52bc4e1..a1c0d7aa24 100644 --- a/smartsim/_core/utils/helpers.py +++ b/smartsim/_core/utils/helpers.py @@ -122,7 +122,6 @@ def expand_exe_path(exe: str) -> str: # which returns none if not found in_path = which(exe) - print(f"hmm what is this: {in_path}") if not in_path: if os.path.isfile(exe) and os.access(exe, os.X_OK): return os.path.abspath(exe) diff --git a/smartsim/entity/_mock.py b/smartsim/entity/_mock.py new file mode 100644 index 0000000000..8f1043ed3c --- /dev/null +++ b/smartsim/entity/_mock.py @@ -0,0 +1,46 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""This module contains stubs of functionality that is not currently +implemented. + +THIS WHOLE MODULE SHOULD BE REMOVED IN FUTURE!! +""" + +from __future__ import annotations + +import typing as t + + +class Mock: + """Base mock class""" + + def __init__(self, *_: t.Any, **__: t.Any): ... + def __getattr__(self, _: str) -> Mock: + return type(self)() + + def __deepcopy__(self, _: dict[t.Any, t.Any]) -> Mock: + return type(self)() diff --git a/smartsim/entity/ensemble.py b/smartsim/entity/ensemble.py index e5ea26453d..517d331615 100644 --- a/smartsim/entity/ensemble.py +++ b/smartsim/entity/ensemble.py @@ -24,569 +24,91 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import os.path as osp -import typing as t -from copy import deepcopy -from os import getcwd - -from tabulate import tabulate +from __future__ import annotations -from .._core._install.builder import Device -from .._core.utils.helpers import expand_exe_path -from ..error import ( - EntityExistsError, - SmartSimError, - SSUnsupportedError, - UserStrategyError, -) -from ..log import get_logger -from ..settings import BatchSettings, RunSettings -from .dbobject import FSModel, FSScript -from .entity import SmartSimEntity -from .entityList import EntityList -from .model import Application -from .strategies import create_all_permutations, random_permutations, step_values +import copy +import itertools +import os +import os.path +import typing as t -logger = get_logger(__name__) +from smartsim.entity import _mock, entity, strategies +from smartsim.entity.files import EntityFiles +from smartsim.entity.model import Application +from smartsim.entity.strategies import ParamSet +from smartsim.launchable.job import Job -StrategyFunction = t.Callable[ - [t.List[str], t.List[t.List[str]], int], t.List[t.Dict[str, str]] -] +if t.TYPE_CHECKING: + from smartsim.settings.launchSettings import LaunchSettings -class Ensemble(EntityList[Application]): - """``Ensemble`` is a group of ``Application`` instances that can - be treated as a reference to a single instance. +class Ensemble(entity.CompoundEntity): + """Entity to help parameterize the creation multiple application + instances. """ def __init__( self, name: str, - params: t.Optional[t.Dict[str, t.Any]] = None, - exe: t.Optional[str] = None, - exe_args: t.Optional[t.List[str]] = None, - path: t.Optional[str] = getcwd(), - params_as_args: t.Optional[t.List[str]] = None, - batch_settings: t.Optional[BatchSettings] = None, - run_settings: t.Optional[RunSettings] = None, - perm_strat: str = "all_perm", - **kwargs: t.Any, - ) -> None: - """Initialize an Ensemble of Application instances. - - The kwargs argument can be used to pass custom input - parameters to the permutation strategy. - - :param name: name of the ensemble - :param exe: executable to run - :param exe_args: executable arguments - :param params: parameters to expand into ``Application`` members - :param params_as_args: list of params that should be used as command - line arguments to the ``Application`` member executables and not written - to generator files - :param batch_settings: describes settings for ``Ensemble`` as batch workload - :param run_settings: describes how each ``Application`` should be executed - :param replicas: number of ``Application`` replicas to create - a keyword - argument of kwargs - :param perm_strategy: strategy for expanding ``params`` into - ``Application`` instances from params argument - options are "all_perm", "step", "random" - or a callable function. - :return: ``Ensemble`` instance - """ - self.exe = exe or "" - self.exe_args = exe_args or [] - self.params = params or {} - self.params_as_args = params_as_args or [] - self._key_prefixing_enabled = True - self.batch_settings = batch_settings - self.run_settings = run_settings - self.replicas: str - - super().__init__(name, path=str(path), perm_strat=perm_strat, **kwargs) - - @property - def applications(self) -> t.Collection[Application]: - """An alias for a shallow copy of the ``entities`` attribute""" - return list(self.entities) - - def _initialize_entities(self, **kwargs: t.Any) -> None: - """Initialize all the applications within the ensemble based - on the parameters passed to the ensemble and the permutation - strategy given at init. - - :raises UserStrategyError: if user generation strategy fails - """ - strategy = self._set_strategy(kwargs.pop("perm_strat")) - replicas = kwargs.pop("replicas", None) - self.replicas = replicas - - # if a ensemble has parameters and run settings, create - # the ensemble and assign run_settings to each member - if self.params: - if self.run_settings and self.exe: - param_names, params = self._read_application_parameters() - - # Compute all combinations of application parameters and arguments - n_applications = kwargs.get("n_applications", 0) - all_application_params = strategy(param_names, params, n_applications) - if not isinstance(all_application_params, list): - raise UserStrategyError(strategy) - - for i, param_set in enumerate(all_application_params): - if not isinstance(param_set, dict): - raise UserStrategyError(strategy) - run_settings = deepcopy(self.run_settings) - application_name = "_".join((self.name, str(i))) - application = Application( - name=application_name, - exe=self.exe, - exe_args=self.exe_args, - params=param_set, - path=osp.join(self.path, application_name), - run_settings=run_settings, - params_as_args=self.params_as_args, - ) - application.enable_key_prefixing() - application.params_to_args() - logger.debug( - f"Created ensemble member: {application_name} in {self.name}" - ) - self.add_application(application) - # cannot generate applications without run settings - else: - raise SmartSimError( - "Ensembles without 'params' or 'replicas' argument to " - "expand into members cannot be given run settings" - ) - else: - if self.run_settings and self.exe: - if replicas: - for i in range(replicas): - application_name = "_".join((self.name, str(i))) - application = Application( - name=application_name, - params={}, - exe=self.exe, - exe_args=self.exe_args, - path=osp.join(self.path, application_name), - run_settings=deepcopy(self.run_settings), - ) - application.enable_key_prefixing() - logger.debug( - f"Created ensemble member: {application_name} in {self.name}" - ) - self.add_application(application) - else: - raise SmartSimError( - "Ensembles without 'params' or 'replicas' argument to " - "expand into members cannot be given run settings" - ) - # if no params, no run settings and no batch settings, error because we - # don't know how to run the ensemble - elif not self.batch_settings: - raise SmartSimError( - "Ensemble must be provided batch settings or run settings" - ) - else: - logger.info("Empty ensemble created for batch launch") - - def add_application(self, application: Application) -> None: - """Add a application to this ensemble - - :param application: application instance to be added - :raises TypeError: if application is not an instance of ``Application`` - :raises EntityExistsError: if application already exists in this ensemble - """ - if not isinstance(application, Application): - raise TypeError( - f"Argument to add_application was of type {type(application)}, not Application" - ) - # "in" operator uses application name for __eq__ - if application in self.entities: - raise EntityExistsError( - f"Application {application.name} already exists in ensemble {self.name}" - ) - - if self._fs_models: - self._extend_entity_fs_models(application, self._fs_models) - if self._fs_scripts: - self._extend_entity_fs_scripts(application, self._fs_scripts) - - self.entities.append(application) - - def register_incoming_entity(self, incoming_entity: SmartSimEntity) -> None: - """Register future communication between entities. - - Registers the named data sources that this entity - has access to by storing the key_prefix associated - with that entity - - Only python clients can have multiple incoming connections - - :param incoming_entity: The entity that data will be received from - """ - for application in self.applications: - application.register_incoming_entity(incoming_entity) - - def enable_key_prefixing(self) -> None: - """If called, each application within this ensemble will prefix its key with its - own application name. - """ - for application in self.applications: - application.enable_key_prefixing() - - def query_key_prefixing(self) -> bool: - """Inquire as to whether each application within the ensemble will prefix their keys - - :returns: True if all applications have key prefixing enabled, False otherwise - """ - return all( - application.query_key_prefixing() for application in self.applications - ) - - def attach_generator_files( - self, - to_copy: t.Optional[t.List[str]] = None, - to_symlink: t.Optional[t.List[str]] = None, - to_configure: t.Optional[t.List[str]] = None, - ) -> None: - """Attach files to each application within the ensemble for generation - - Attach files needed for the entity that, upon generation, - will be located in the path of the entity. - - During generation, files "to_copy" are copied into - the path of the entity, and files "to_symlink" are - symlinked into the path of the entity. - - Files "to_configure" are text based application input files where - parameters for the application are set. Note that only applications - support the "to_configure" field. These files must have - fields tagged that correspond to the values the user - would like to change. The tag is settable but defaults - to a semicolon e.g. THERMO = ;10; - - :param to_copy: files to copy - :param to_symlink: files to symlink - :param to_configure: input files with tagged parameters - """ - for application in self.applications: - application.attach_generator_files( - to_copy=to_copy, to_symlink=to_symlink, to_configure=to_configure - ) - - @property - def attached_files_table(self) -> str: - """Return a plain-text table with information about files - attached to applications belonging to this ensemble. - - :returns: A table of all files attached to all applications - """ - if not self.applications: - return "The ensemble is empty, no files to show." - - table = tabulate( - [ - [application.name, application.attached_files_table] - for application in self.applications - ], - headers=["Application name", "Files"], - tablefmt="grid", - ) - - return table - - def print_attached_files(self) -> None: - """Print table of attached files to std out""" - print(self.attached_files_table) - - @staticmethod - def _set_strategy(strategy: str) -> StrategyFunction: - """Set the permutation strategy for generating applications within - the ensemble - - :param strategy: name of the strategy or callable function - :raises SSUnsupportedError: if str name is not supported - :return: strategy function - """ - if strategy == "all_perm": - return create_all_permutations - if strategy == "step": - return step_values - if strategy == "random": - return random_permutations - if callable(strategy): - return strategy - raise SSUnsupportedError( - f"Permutation strategy given is not supported: {strategy}" - ) - - def _read_application_parameters(self) -> t.Tuple[t.List[str], t.List[t.List[str]]]: - """Take in the parameters given to the ensemble and prepare to - create applications for the ensemble - - :raises TypeError: if params are of the wrong type - :return: param names and values for permutation strategy - """ - - if not isinstance(self.params, dict): - raise TypeError( - "Ensemble initialization argument 'params' must be of type dict" - ) - - param_names: t.List[str] = [] - parameters: t.List[t.List[str]] = [] - for name, val in self.params.items(): - param_names.append(name) - - if isinstance(val, list): - val = [str(v) for v in val] - parameters.append(val) - elif isinstance(val, (int, str)): - parameters.append([str(val)]) - else: - raise TypeError( - "Incorrect type for ensemble parameters\n" - + "Must be list, int, or string." - ) - return param_names, parameters - - def add_ml_model( - self, - name: str, - backend: str, - model: t.Optional[bytes] = None, - model_path: t.Optional[str] = None, - device: str = Device.CPU.value.upper(), - devices_per_node: int = 1, - first_device: int = 0, - batch_size: int = 0, - min_batch_size: int = 0, - min_batch_timeout: int = 0, - tag: str = "", - inputs: t.Optional[t.List[str]] = None, - outputs: t.Optional[t.List[str]] = None, + exe: str | os.PathLike[str], + exe_args: t.Sequence[str] | None = None, + exe_arg_parameters: t.Mapping[str, t.Sequence[t.Sequence[str]]] | None = None, + path: str | os.PathLike[str] | None = None, + files: EntityFiles | None = None, + file_parameters: t.Mapping[str, t.Sequence[str]] | None = None, + permutation_strategy: str | strategies.PermutationStrategyType = "all_perm", + max_permutations: int = -1, + replicas: int = 1, ) -> None: - """A TF, TF-lite, PT, or ONNX model to load into the fs at runtime - - Each ML Model added will be loaded into a - feature store (converged or not) prior to the execution - of every entity belonging to this ensemble - - One of either model (in memory representation) or model_path (file) - must be provided - - :param name: key to store model under - :param model: model in memory - :param model_path: serialized model - :param backend: name of the backend (TORCH, TF, TFLITE, ONNX) - :param device: name of device for execution - :param devices_per_node: number of GPUs per node in multiGPU nodes - :param first_device: first device in multi-GPU nodes to use for execution, - defaults to 0; ignored if devices_per_node is 1 - :param batch_size: batch size for execution - :param min_batch_size: minimum batch size for model execution - :param min_batch_timeout: time to wait for minimum batch size - :param tag: additional tag for model information - :param inputs: model inputs (TF only) - :param outputs: model outupts (TF only) - """ - fs_model = FSModel( - name=name, - backend=backend, - model=model, - model_file=model_path, - device=device, - devices_per_node=devices_per_node, - first_device=first_device, - batch_size=batch_size, - min_batch_size=min_batch_size, - min_batch_timeout=min_batch_timeout, - tag=tag, - inputs=inputs, - outputs=outputs, - ) - dupe = next( - ( - fs_model.name - for ensemble_ml_model in self._fs_models - if ensemble_ml_model.name == fs_model.name - ), - None, + self.name = name + self.exe = os.fspath(exe) + self.exe_args = list(exe_args) if exe_args else [] + self.exe_arg_parameters = ( + copy.deepcopy(exe_arg_parameters) if exe_arg_parameters else {} ) - if dupe: - raise SSUnsupportedError( - f'An ML Model with name "{fs_model.name}" already exists' - ) - self._fs_models.append(fs_model) - for entity in self.applications: - self._extend_entity_fs_models(entity, [fs_model]) - - def add_script( - self, - name: str, - script: t.Optional[str] = None, - script_path: t.Optional[str] = None, - device: str = Device.CPU.value.upper(), - devices_per_node: int = 1, - first_device: int = 0, - ) -> None: - """TorchScript to launch with every entity belonging to this ensemble - - Each script added to the application will be loaded into an - feature store (converged or not) prior to the execution - of every entity belonging to this ensemble - - Device selection is either "GPU" or "CPU". If many devices are - present, a number can be passed for specification e.g. "GPU:1". - - Setting ``devices_per_node=N``, with N greater than one will result - in the application being stored in the first N devices of type ``device``. - - One of either script (in memory string representation) or script_path (file) - must be provided + self.path = os.fspath(path) if path is not None else os.getcwd() + # ^^^^^^^^^^^ + # TODO: Copied from the original implementation, but I'm not sure that + # I like this default. Shouldn't it be something under an + # experiment directory? If so, how it injected?? + self.files = copy.deepcopy(files) if files else EntityFiles() + self.file_parameters = dict(file_parameters) if file_parameters else {} + self.permutation_strategy = permutation_strategy + self.max_permutations = max_permutations + self.replicas = replicas - :param name: key to store script under - :param script: TorchScript code - :param script_path: path to TorchScript code - :param device: device for script execution - :param devices_per_node: number of devices on each host - :param first_device: first device to use on each host + def _create_applications(self) -> tuple[Application, ...]: + """Concretize the ensemble attributes into a collection of + application instances. """ - fs_script = FSScript( - name=name, - script=script, - script_path=script_path, - device=device, - devices_per_node=devices_per_node, - first_device=first_device, + permutation_strategy = strategies.resolve(self.permutation_strategy) + combinations = permutation_strategy( + self.file_parameters, self.exe_arg_parameters, self.max_permutations ) - dupe = next( - ( - fs_script.name - for ensemble_script in self._fs_scripts - if ensemble_script.name == fs_script.name - ), - None, + combinations = combinations if combinations else [ParamSet({}, {})] + permutations_ = itertools.chain.from_iterable( + itertools.repeat(permutation, self.replicas) for permutation in combinations ) - if dupe: - raise SSUnsupportedError( - f'A Script with name "{fs_script.name}" already exists' + return tuple( + Application( + name=f"{self.name}-{i}", + exe=self.exe, + run_settings=_mock.Mock(), + # ^^^^^^^^^^^^^^^^^^^^^^^ + # FIXME: remove this constructor arg! It should not exist!! + exe_args=self.exe_args, + path=os.path.join(self.path, self.name), + files=self.files, + params=permutation.params, + params_as_args=permutation.exe_args, # type: ignore[arg-type] + # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + # FIXME: this is the wrong type on Application! ) - self._fs_scripts.append(fs_script) - for entity in self.applications: - self._extend_entity_fs_scripts(entity, [fs_script]) - - def add_function( - self, - name: str, - function: t.Optional[str] = None, - device: str = Device.CPU.value.upper(), - devices_per_node: int = 1, - first_device: int = 0, - ) -> None: - """TorchScript function to launch with every entity belonging to this ensemble - - Each script function to the application will be loaded into a - non-converged feature store prior to the execution - of every entity belonging to this ensemble. - - For converged feature stores, the :meth:`add_script` method should be used. - - Device selection is either "GPU" or "CPU". If many devices are - present, a number can be passed for specification e.g. "GPU:1". - - Setting ``devices_per_node=N``, with N greater than one will result - in the script being stored in the first N devices of type ``device``; - alternatively, setting ``first_device=M`` will result in the script - being stored on nodes M through M + N - 1. - - :param name: key to store function under - :param function: TorchScript code - :param device: device for script execution - :param devices_per_node: number of devices on each host - :param first_device: first device to use on each host - """ - fs_script = FSScript( - name=name, - script=function, - device=device, - devices_per_node=devices_per_node, - first_device=first_device, - ) - dupe = next( - ( - fs_script.name - for ensemble_script in self._fs_scripts - if ensemble_script.name == fs_script.name - ), - None, + for i, permutation in enumerate(permutations_) ) - if dupe: - raise SSUnsupportedError( - f'A Script with name "{fs_script.name}" already exists' - ) - self._fs_scripts.append(fs_script) - for entity in self.applications: - self._extend_entity_fs_scripts(entity, [fs_script]) - @staticmethod - def _extend_entity_fs_models( - application: Application, fs_models: t.List[FSModel] - ) -> None: - """ - Ensures that the Machine Learning model names being added to the Ensemble - are unique. - - This static method checks if the provided ML model names already exist in - the Ensemble. An SSUnsupportedError is raised if any duplicate names are - found. Otherwise, it appends the given list of FSModel to the Ensemble. - - :param application: SmartSim Application object. - :param fs_models: List of FSModels to append to the Ensemble. - """ - for add_ml_model in fs_models: - dupe = next( - ( - fs_model.name - for fs_model in application.fs_models - if fs_model.name == add_ml_model.name - ), - None, - ) - if dupe: - raise SSUnsupportedError( - f'An ML Model with name "{add_ml_model.name}" already exists' - ) - application.add_ml_model_object(add_ml_model) - - @staticmethod - def _extend_entity_fs_scripts( - application: Application, fs_scripts: t.List[FSScript] - ) -> None: - """ - Ensures that the script/function names being added to the Ensemble are unique. - - This static method checks if the provided script/function names already exist - in the Ensemble. An SSUnsupportedError is raised if any duplicate names - are found. Otherwise, it appends the given list of FSScripts to the - Ensemble. - - :param application: SmartSim Application object. - :param fs_scripts: List of FSScripts to append to the Ensemble. - """ - for add_script in fs_scripts: - dupe = next( - ( - add_script.name - for fs_script in application.fs_scripts - if fs_script.name == add_script.name - ), - None, - ) - if dupe: - raise SSUnsupportedError( - f'A Script with name "{add_script.name}" already exists' - ) - application.add_script_object(add_script) + def as_jobs(self, settings: LaunchSettings) -> tuple[Job, ...]: + apps = self._create_applications() + if not apps: + raise ValueError("There are no members as part of this ensemble") + return tuple(Job(app, settings) for app in apps) diff --git a/smartsim/entity/entity.py b/smartsim/entity/entity.py index bf6398ca3d..6416a8b2b4 100644 --- a/smartsim/entity/entity.py +++ b/smartsim/entity/entity.py @@ -24,9 +24,16 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from __future__ import annotations + +import abc import typing as t +from smartsim.launchable.jobGroup import JobGroup + if t.TYPE_CHECKING: + from smartsim.launchable.job import Job + from smartsim.settings.launchSettings import LaunchSettings from smartsim.types import TODO RunSettings = TODO @@ -117,3 +124,18 @@ def set_path(self, path: str) -> None: def __repr__(self) -> str: return self.name + + +class CompoundEntity(abc.ABC): + """An interface to create different types of collections of launchables + from a single set of launch settings. + + Objects that implement this interface describe how to turn their entities + into a collection of jobs and this interface will handle coercion into + other collections for jobs with slightly different launching behavior. + """ + + @abc.abstractmethod + def as_jobs(self, settings: LaunchSettings) -> t.Collection[Job]: ... + def as_job_group(self, settings: LaunchSettings) -> JobGroup: + return JobGroup(list(self.as_jobs(settings))) diff --git a/smartsim/entity/model.py b/smartsim/entity/model.py index b0db991495..4304ee95bd 100644 --- a/smartsim/entity/model.py +++ b/smartsim/entity/model.py @@ -39,25 +39,35 @@ from .._core.utils.helpers import cat_arg_and_value, expand_exe_path from ..error import EntityExistsError, SSUnsupportedError from ..log import get_logger -from ..settings import BatchSettings, RunSettings from .dbobject import FSModel, FSScript from .entity import SmartSimEntity from .files import EntityFiles +if t.TYPE_CHECKING: + from smartsim.types import TODO + + RunSettings = TODO + BatchSettings = TODO + + logger = get_logger(__name__) +# TODO: Remove this supression when we strip fileds/functionality +# (run-settings/batch_settings/params_as_args/etc)! +# pylint: disable-next=too-many-public-methods class Application(SmartSimEntity): def __init__( self, name: str, exe: str, - run_settings: RunSettings, + run_settings: "RunSettings", params: t.Optional[t.Dict[str, str]] = None, exe_args: t.Optional[t.List[str]] = None, path: t.Optional[str] = getcwd(), params_as_args: t.Optional[t.List[str]] = None, - batch_settings: t.Optional[BatchSettings] = None, + batch_settings: t.Optional["BatchSettings"] = None, + files: t.Optional[EntityFiles] = None, ): """Initialize a ``Application`` @@ -73,19 +83,20 @@ def __init__( be added to run_settings :param batch_settings: Launcher settings for running the individual application as a batch job + :param files: Files to have available to the application """ super().__init__(name, str(path), run_settings) self.exe = [expand_exe_path(exe)] # self.exe = [exe] if run_settings.container else [expand_exe_path(exe)] self.exe_args = exe_args or [] - self.params = params or {} + self.params = params.copy() if params else {} self.params_as_args = params_as_args self.incoming_entities: t.List[SmartSimEntity] = [] self._key_prefixing_enabled = False self.batch_settings = batch_settings self._fs_models: t.List[FSModel] = [] self._fs_scripts: t.List[FSScript] = [] - self.files: t.Optional[EntityFiles] = None + self.files = copy.deepcopy(files) if files else None @property def exe_args(self) -> t.Union[str, t.List[str]]: @@ -127,8 +138,7 @@ def colocated(self) -> bool: """ if self.run_settings is None: return False - else: - return bool(self.run_settings.colocated_fs_settings) + return bool(self.run_settings.colocated_fs_settings) def add_exe_args(self, args: t.Union[str, t.List[str]]) -> None: """Add executable arguments to executable @@ -163,7 +173,9 @@ def enable_key_prefixing(self) -> None: self._key_prefixing_enabled = True def disable_key_prefixing(self) -> None: - """If called, the entity will not prefix its keys with its own application name""" + """If called, the entity will not prefix its keys with its own + application name + """ self._key_prefixing_enabled = False def query_key_prefixing(self) -> bool: @@ -256,9 +268,10 @@ def colocate_fs_uds( ) -> None: """Colocate an FeatureStore instance with this Application over UDS. - This method will initialize settings which add an unsharded - feature store to this Application instance. Only this Application will be able to communicate - with this colocated feature store by using Unix Domain sockets. + This method will initialize settings which add an unsharded feature + store to this Application instance. Only this Application will be able + to communicate with this colocated feature store by using Unix Domain + sockets. Extra parameters for the fs can be passed through kwargs. This includes many performance, caching and inference settings. @@ -281,8 +294,10 @@ def colocate_fs_uds( :param fs_cpus: number of cpus to use for FeatureStore :param custom_pinning: CPUs to pin the FeatureStore to. Passing an empty iterable disables pinning - :param debug: launch Application with extra debug information about the colocated fs - :param kwargs: additional keyword arguments to pass to the FeatureStore feature store + :param debug: launch Application with extra debug information about the + colocated fs + :param kwargs: additional keyword arguments to pass to the FeatureStore + feature store """ if not re.match(r"^[a-zA-Z0-9.:\,_\-/]*$", unix_socket): @@ -317,9 +332,10 @@ def colocate_fs_tcp( ) -> None: """Colocate an FeatureStore instance with this Application over TCP/IP. - This method will initialize settings which add an unsharded - feature store to this Application instance. Only this Application will be able to communicate - with this colocated feature store by using the loopback TCP interface. + This method will initialize settings which add an unsharded feature + store to this Application instance. Only this Application will be able + to communicate with this colocated feature store by using the loopback + TCP interface. Extra parameters for the fs can be passed through kwargs. This includes many performance, caching and inference settings. @@ -342,8 +358,10 @@ def colocate_fs_tcp( :param fs_cpus: number of cpus to use for FeatureStore :param custom_pinning: CPUs to pin the FeatureStore to. Passing an empty iterable disables pinning - :param debug: launch Application with extra debug information about the colocated fs - :param kwargs: additional keyword arguments to pass to the FeatureStore feature store + :param debug: launch Application with extra debug information about the + colocated fs + :param kwargs: additional keyword arguments to pass to the FeatureStore + feature store """ tcp_options = {"port": port, "ifname": ifname} @@ -377,7 +395,8 @@ def _set_colocated_fs_settings( if hasattr(self.run_settings, "mpmd") and len(self.run_settings.mpmd) > 0: raise SSUnsupportedError( - "Applications colocated with feature stores cannot be run as a mpmd workload" + "Applications colocated with feature stores cannot be run as a " + "mpmd workload" ) if hasattr(self.run_settings, "_prep_colocated_fs"): @@ -489,8 +508,9 @@ def params_to_args(self) -> None: for param in self.params_as_args: if not param in self.params: raise ValueError( - f"Tried to convert {param} to command line argument for Application " - f"{self.name}, but its value was not found in application params" + f"Tried to convert {param} to command line argument for " + f"application {self.name}, but its value was not found " + "in application params" ) if self.run_settings is None: raise ValueError( @@ -526,7 +546,8 @@ def add_ml_model( :param name: key to store model under :param backend: name of the backend (TORCH, TF, TFLITE, ONNX) - :param model: A model in memory (only supported for non-colocated feature stores) + :param model: A model in memory (only supported for non-colocated + feature stores) :param model_path: serialized model :param device: name of device for execution :param devices_per_node: The number of GPU devices available on the host. @@ -685,7 +706,8 @@ def add_script_object(self, fs_script: FSScript) -> None: if fs_script.func and self.colocated: if not isinstance(fs_script.func, str): err_msg = ( - "Functions can not be set from memory for colocated feature stores.\n" + "Functions can not be set from memory for colocated " + "feature stores.\n" f"Please convert the function named {fs_script.name} " "to a string or store it as a text file and add it to the " "SmartSim Application with add_script." @@ -697,7 +719,8 @@ def _check_fs_objects_colo(self) -> None: for fs_model in self._fs_models: if not fs_model.is_file: err_msg = ( - "ML model can not be set from memory for colocated feature stores.\n" + "ML model can not be set from memory for colocated " + "feature stores.\n" f"Please store the ML model named {fs_model.name} in binary " "format and add it to the SmartSim Application as file." ) diff --git a/smartsim/entity/strategies.py b/smartsim/entity/strategies.py index f6f57b329c..e3a2527a52 100644 --- a/smartsim/entity/strategies.py +++ b/smartsim/entity/strategies.py @@ -25,44 +25,238 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # Generation Strategies + +from __future__ import annotations + +import functools +import itertools import random import typing as t -from itertools import product +from dataclasses import dataclass, field + +from smartsim.error import errors + + +@dataclass(frozen=True) +class ParamSet: + """ + Represents a set of file parameters and execution arguments as parameters. + """ + + params: dict[str, str] = field(default_factory=dict) + exe_args: dict[str, list[str]] = field(default_factory=dict) + + +# Type alias for the shape of a permutation strategy callable +PermutationStrategyType = t.Callable[ + [t.Mapping[str, t.Sequence[str]], t.Mapping[str, t.Sequence[t.Sequence[str]]], int], + list[ParamSet], +] + +# Map of globally registered strategy names to registered strategy callables +_REGISTERED_STRATEGIES: t.Final[dict[str, PermutationStrategyType]] = {} + + +def _register(name: str) -> t.Callable[ + [PermutationStrategyType], + PermutationStrategyType, +]: + """Create a decorator to globally register a permutation strategy under a + given name. + + :param name: The name under which to register a strategy + :return: A decorator to register a permutation strategy function + """ + + def _impl(fn: PermutationStrategyType) -> PermutationStrategyType: + """Add a strategy function to the globally registered strategies under + the `name` caught in the closure. + + :param fn: A permutation strategy + :returns: The original strategy, unaltered + :raises ValueError: A strategy under name caught in the closure has + already been registered + """ + if name in _REGISTERED_STRATEGIES: + msg = f"A strategy with the name '{name}' has already been registered" + raise ValueError(msg) + _REGISTERED_STRATEGIES[name] = fn + return fn + + return _impl + +def resolve(strategy: str | PermutationStrategyType) -> PermutationStrategyType: + """Look-up or sanitize a permutation strategy: -# create permutations of all parameters -# single application if parameters only have one value + - When `strategy` is a `str` it will look for a globally registered + strategy function by that name. + + - When `strategy` is a `callable` it is will return a sanitized + strategy function. + + :param strategy: The name of a registered strategy or a custom + permutation strategy + :return: A valid permutation strategy callable + """ + if callable(strategy): + return _make_sanitized_custom_strategy(strategy) + try: + return _REGISTERED_STRATEGIES[strategy] + except KeyError: + raise ValueError( + f"Failed to find an ensembling strategy by the name of '{strategy}'." + f"All known strategies are:\n{', '.join(_REGISTERED_STRATEGIES)}" + ) from None + + +def _make_sanitized_custom_strategy( + fn: PermutationStrategyType, +) -> PermutationStrategyType: + """Take a callable that satisfies the shape of a permutation strategy and + return a sanitized version for future callers. + + The sanitized version of the permutation strategy will intercept any + exceptions raised by the original permutation and re-raise a + `UserStrategyError`. + + The sanitized version will also check the type of the value returned from + the original callable, and if it does conform to the expected return type, + a `UserStrategyError` will be raised. + + :param fn: A custom user strategy function + :return: A sanitized version of the custom strategy function + """ + + @functools.wraps(fn) + def _impl( + params: t.Mapping[str, t.Sequence[str]], + exe_args: t.Mapping[str, t.Sequence[t.Sequence[str]]], + n_permutations: int = -1, + ) -> list[ParamSet]: + try: + permutations = fn(params, exe_args, n_permutations) + except Exception as e: + raise errors.UserStrategyError(str(fn)) from e + if not isinstance(permutations, list) or not all( + isinstance(permutation, ParamSet) for permutation in permutations + ): + raise errors.UserStrategyError(str(fn)) + return permutations + + return _impl + + +@_register("all_perm") def create_all_permutations( - param_names: t.List[str], - param_values: t.List[t.List[str]], - _n_applications: int = 0, -) -> t.List[t.Dict[str, str]]: - perms = list(product(*param_values)) - all_permutations = [] - for permutation in perms: - temp_application = dict(zip(param_names, permutation)) - all_permutations.append(temp_application) - return all_permutations + params: t.Mapping[str, t.Sequence[str]], + exe_arg: t.Mapping[str, t.Sequence[t.Sequence[str]]], + n_permutations: int = -1, +) -> list[ParamSet]: + """Take two mapping parameters to possible values and return a sequence of + all possible permutations of those parameters. + For example calling: + .. highlight:: python + .. code-block:: python + create_all_permutations({"SPAM": ["a", "b"], + "EGGS": ["c", "d"]}, + {"EXE": [["a"], ["b", "c"]], + "ARGS": [["d"], ["e", "f"]]}, + 1 + ) + Would result in the following permutations (not necessarily in this order): + .. highlight:: python + .. code-block:: python + [ParamSet(params={'SPAM': 'a', 'EGGS': 'c'}, + exe_args={'EXE': ['a'], 'ARGS': ['d']})] + :param file_params: A mapping of file parameter names to possible values + :param exe_arg_params: A mapping of exe arg parameter names to possible values + :param n_permutations: The maximum number of permutations to sample from + the sequence of all permutations + :return: A sequence of ParamSets of all possible permutations + """ + file_params_permutations = itertools.product(*params.values()) + param_zip = ( + dict(zip(params, permutation)) for permutation in file_params_permutations + ) + + exe_arg_params_permutations = itertools.product(*exe_arg.values()) + exe_arg_params_permutations_ = ( + tuple(map(list, sequence)) for sequence in exe_arg_params_permutations + ) + exe_arg_zip = ( + dict(zip(exe_arg, permutation)) for permutation in exe_arg_params_permutations_ + ) + combinations = itertools.product(param_zip, exe_arg_zip) + param_set: t.Iterable[ParamSet] = ( + ParamSet(file_param, exe_arg) for file_param, exe_arg in combinations + ) + if n_permutations >= 0: + param_set = itertools.islice(param_set, n_permutations) + return list(param_set) + +@_register("step") def step_values( - param_names: t.List[str], - param_values: t.List[t.List[str]], - _n_applications: int = 0, -) -> t.List[t.Dict[str, str]]: - permutations = [] - for param_value in zip(*param_values): - permutations.append(dict(zip(param_names, param_value))) - return permutations + params: t.Mapping[str, t.Sequence[str]], + exe_args: t.Mapping[str, t.Sequence[t.Sequence[str]]], + n_permutations: int = -1, +) -> list[ParamSet]: + """Take two mapping parameters to possible values and return a sequence of + stepped values until a possible values sequence runs out of possible + values. + For example calling: + .. highlight:: python + .. code-block:: python + step_values({"SPAM": ["a", "b"], + "EGGS": ["c", "d"]}, + {"EXE": [["a"], ["b", "c"]], + "ARGS": [["d"], ["e", "f"]]}, + 1 + ) + Would result in the following permutations: + .. highlight:: python + .. code-block:: python + [ParamSet(params={'SPAM': 'a', 'EGGS': 'c'}, + exe_args={'EXE': ['a'], 'ARGS': ['d']})] + :param file_params: A mapping of file parameter names to possible values + :param exe_arg_params: A mapping of exe arg parameter names to possible values + :param n_permutations: The maximum number of permutations to sample from + the sequence of step permutations + :return: A sequence of ParamSets of stepped values + """ + param_zip: t.Iterable[tuple[str, ...]] = zip(*params.values()) + param_zip_ = (dict(zip(params, step)) for step in param_zip) + exe_arg_zip: t.Iterable[tuple[t.Sequence[str], ...]] = zip(*exe_args.values()) + exe_arg_zip_ = (map(list, sequence) for sequence in exe_arg_zip) + exe_arg_zip__ = (dict(zip(exe_args, step)) for step in exe_arg_zip_) -def random_permutations( - param_names: t.List[str], param_values: t.List[t.List[str]], n_applications: int = 0 -) -> t.List[t.Dict[str, str]]: - permutations = create_all_permutations(param_names, param_values) + param_set: t.Iterable[ParamSet] = ( + ParamSet(file_param, exe_arg) + for file_param, exe_arg in zip(param_zip_, exe_arg_zip__) + ) + if n_permutations >= 0: + param_set = itertools.islice(param_set, n_permutations) + return list(param_set) - # sample from available permutations if n_applications is specified - if n_applications and n_applications < len(permutations): - permutations = random.sample(permutations, n_applications) +@_register("random") +def random_permutations( + params: t.Mapping[str, t.Sequence[str]], + exe_args: t.Mapping[str, t.Sequence[t.Sequence[str]]], + n_permutations: int = -1, +) -> list[ParamSet]: + """Take two mapping parameters to possible values and return a sequence of + length `n_permutations` sampled randomly from all possible permutations + :param file_params: A mapping of file parameter names to possible values + :param exe_arg_params: A mapping of exe arg parameter names to possible values + :param n_permutations: The maximum number of permutations to sample from + the sequence of all permutations + :return: A sequence of ParamSets of sampled permutations + """ + permutations = create_all_permutations(params, exe_args, -1) + if 0 <= n_permutations < len(permutations): + permutations = random.sample(permutations, n_permutations) return permutations diff --git a/smartsim/experiment.py b/smartsim/experiment.py index 610c14ce86..fa80be037a 100644 --- a/smartsim/experiment.py +++ b/smartsim/experiment.py @@ -282,8 +282,6 @@ def stop( try: for entity in stop_manifest.applications: self._control.stop_entity(entity) - for entity_list in stop_manifest.ensembles: - self._control.stop_entity_list(entity_list) fss = stop_manifest.fss for fs in fss: self._control.stop_fs(fs) @@ -586,9 +584,6 @@ def create_entity_dir( for ensemble in start_manifest.ensembles: create_entity_dir(ensemble) - for member in ensemble.applications: - create_entity_dir(member) - def __str__(self) -> str: return self.name diff --git a/smartsim/launchable/job.py b/smartsim/launchable/job.py index a26b92604e..f440ead0b5 100644 --- a/smartsim/launchable/job.py +++ b/smartsim/launchable/job.py @@ -24,13 +24,18 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from __future__ import annotations + +import typing as t from copy import deepcopy from smartsim._core.commands.launchCommands import LaunchCommands -from smartsim.entity.entity import SmartSimEntity from smartsim.launchable.basejob import BaseJob from smartsim.settings import LaunchSettings +if t.TYPE_CHECKING: + from smartsim.entity.entity import SmartSimEntity + class Job(BaseJob): """A Job holds a reference to a SmartSimEntity and associated diff --git a/smartsim/launchable/mpmdjob.py b/smartsim/launchable/mpmdjob.py index 7581a7e1c2..7426d91368 100644 --- a/smartsim/launchable/mpmdjob.py +++ b/smartsim/launchable/mpmdjob.py @@ -30,7 +30,6 @@ import typing as t from copy import deepcopy -from smartsim.entity.entity import SmartSimEntity from smartsim.error.errors import SSUnsupportedError from smartsim.launchable.basejob import BaseJob from smartsim.launchable.mpmdpair import MPMDPair @@ -38,6 +37,7 @@ if t.TYPE_CHECKING: from smartsim._core.commands.launchCommands import LaunchCommands + from smartsim.entity.entity import SmartSimEntity def _check_launcher(mpmd_pairs: t.List[MPMDPair]) -> None: diff --git a/smartsim/launchable/mpmdpair.py b/smartsim/launchable/mpmdpair.py index 2b6ce36392..3df8a0ee51 100644 --- a/smartsim/launchable/mpmdpair.py +++ b/smartsim/launchable/mpmdpair.py @@ -24,11 +24,16 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from __future__ import annotations + import copy +import typing as t -from smartsim.entity.entity import SmartSimEntity from smartsim.settings.launchSettings import LaunchSettings +if t.TYPE_CHECKING: + from smartsim.entity.entity import SmartSimEntity + class MPMDPair: """Class to store MPMD Pairs""" diff --git a/tests/test_ensemble.py b/tests/test_ensemble.py new file mode 100644 index 0000000000..3f170dfcb2 --- /dev/null +++ b/tests/test_ensemble.py @@ -0,0 +1,255 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import itertools +import typing as t + +import pytest + +from smartsim.entity import _mock +from smartsim.entity.ensemble import Ensemble +from smartsim.entity.strategies import ParamSet +from smartsim.settings.launchSettings import LaunchSettings + +pytestmark = pytest.mark.group_a + +_2x2_PARAMS = {"SPAM": ["a", "b"], "EGGS": ["c", "d"]} +_2x2_EXE_ARG = {"EXE": [["a"], ["b", "c"]], "ARGS": [["d"], ["e", "f"]]} + + +def user_created_function( + file_params: t.Mapping[str, t.Sequence[str]], + exe_arg_params: t.Mapping[str, t.Sequence[t.Sequence[str]]], + n_permutations: int = 0, +) -> list[ParamSet]: + return [ParamSet({}, {})] + + +@pytest.fixture +def mock_launcher_settings(wlmutils): + return LaunchSettings(wlmutils.get_test_launcher(), {}, {}) + + +def test_ensemble_user_created_strategy(mock_launcher_settings, test_dir): + jobs = Ensemble( + "test_ensemble", + "echo", + ("hello", "world"), + path=test_dir, + permutation_strategy=user_created_function, + ).as_jobs(mock_launcher_settings) + assert len(jobs) == 1 + + +def test_ensemble_without_any_members_raises_when_cast_to_jobs( + mock_launcher_settings, test_dir +): + with pytest.raises(ValueError): + Ensemble( + "test_ensemble", + "echo", + ("hello", "world"), + path=test_dir, + file_parameters=_2x2_PARAMS, + permutation_strategy="random", + max_permutations=30, + replicas=0, + ).as_jobs(mock_launcher_settings) + + +def test_strategy_error_raised_if_a_strategy_that_dne_is_requested(test_dir): + with pytest.raises(ValueError): + Ensemble( + "test_ensemble", + "echo", + ("hello",), + path=test_dir, + permutation_strategy="THIS-STRATEGY-DNE", + )._create_applications() + + +@pytest.mark.parametrize( + "params", + ( + pytest.param({"SPAM": ["eggs"]}, id="Non-Empty Params"), + pytest.param({}, id="Empty Params"), + pytest.param(None, id="Nullish Params"), + ), +) +def test_replicated_applications_have_eq_deep_copies_of_parameters(params, test_dir): + apps = list( + Ensemble( + "test_ensemble", + "echo", + ("hello",), + path=test_dir, + replicas=4, + file_parameters=params, + )._create_applications() + ) + assert len(apps) >= 2 # Sanitiy check to make sure the test is valid + assert all(app_1.params == app_2.params for app_1 in apps for app_2 in apps) + assert all( + app_1.params is not app_2.params + for app_1 in apps + for app_2 in apps + if app_1 is not app_2 + ) + + +# fmt: off +@pytest.mark.parametrize( + " params, exe_arg_params, max_perms, replicas, expected_num_jobs", + (pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 30, 1, 16 , id="Set max permutation high"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, -1, 1, 16 , id="Set max permutation negative"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 0, 1, 1 , id="Set max permutation zero"), + pytest.param(_2x2_PARAMS, None, 4, 1, 4 , id="No exe arg params or Replicas"), + pytest.param( None, _2x2_EXE_ARG, 4, 1, 4 , id="No Parameters or Replicas"), + pytest.param( None, None, 4, 1, 1 , id="No Parameters, Exe_Arg_Param or Replicas"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 1, 1, 1 , id="Set max permutation to lowest"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 6, 2, 12 , id="Set max permutation, set replicas"), + pytest.param( {}, _2x2_EXE_ARG, 6, 2, 8 , id="Set params as dict, set max permutations and replicas"), + pytest.param(_2x2_PARAMS, {}, 6, 2, 8 , id="Set params as dict, set max permutations and replicas"), + pytest.param( {}, {}, 6, 2, 2 , id="Set params as dict, set max permutations and replicas") +)) +# fmt: on +def test_all_perm_strategy( + # Parameterized + params, + exe_arg_params, + max_perms, + replicas, + expected_num_jobs, + # Other fixtures + mock_launcher_settings, + test_dir, +): + jobs = Ensemble( + "test_ensemble", + "echo", + ("hello", "world"), + path=test_dir, + file_parameters=params, + exe_arg_parameters=exe_arg_params, + permutation_strategy="all_perm", + max_permutations=max_perms, + replicas=replicas, + ).as_jobs(mock_launcher_settings) + assert len(jobs) == expected_num_jobs + + +def test_all_perm_strategy_contents(): + jobs = Ensemble( + "test_ensemble", + "echo", + ("hello", "world"), + file_parameters=_2x2_PARAMS, + exe_arg_parameters=_2x2_EXE_ARG, + permutation_strategy="all_perm", + max_permutations=16, + replicas=1, + ).as_jobs(mock_launcher_settings) + assert len(jobs) == 16 + + +# fmt: off +@pytest.mark.parametrize( + " params, exe_arg_params, max_perms, replicas, expected_num_jobs", + (pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 30, 1, 2 , id="Set max permutation high"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, -1, 1, 2 , id="Set max permutation negtive"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 0, 1, 1 , id="Set max permutation zero"), + pytest.param(_2x2_PARAMS, None, 4, 1, 1 , id="No exe arg params or Replicas"), + pytest.param( None, _2x2_EXE_ARG, 4, 1, 1 , id="No Parameters or Replicas"), + pytest.param( None, None, 4, 1, 1 , id="No Parameters, Exe_Arg_Param or Replicas"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 1, 1, 1 , id="Set max permutation to lowest"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 6, 2, 4 , id="Set max permutation, set replicas"), + pytest.param( {}, _2x2_EXE_ARG, 6, 2, 2 , id="Set params as dict, set max permutations and replicas"), + pytest.param(_2x2_PARAMS, {}, 6, 2, 2 , id="Set params as dict, set max permutations and replicas"), + pytest.param( {}, {}, 6, 2, 2 , id="Set params as dict, set max permutations and replicas") +)) +# fmt: on +def test_step_strategy( + # Parameterized + params, + exe_arg_params, + max_perms, + replicas, + expected_num_jobs, + # Other fixtures + mock_launcher_settings, + test_dir, +): + jobs = Ensemble( + "test_ensemble", + "echo", + ("hello", "world"), + path=test_dir, + file_parameters=params, + exe_arg_parameters=exe_arg_params, + permutation_strategy="step", + max_permutations=max_perms, + replicas=replicas, + ).as_jobs(mock_launcher_settings) + assert len(jobs) == expected_num_jobs + + +# fmt: off +@pytest.mark.parametrize( + " params, exe_arg_params, max_perms, replicas, expected_num_jobs", + (pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 30, 1, 16 , id="Set max permutation high"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, -1, 1, 16 , id="Set max permutation negative"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 0, 1, 1 , id="Set max permutation zero"), + pytest.param(_2x2_PARAMS, None, 4, 1, 4 , id="No exe arg params or Replicas"), + pytest.param( None, _2x2_EXE_ARG, 4, 1, 4 , id="No Parameters or Replicas"), + pytest.param( None, None, 4, 1, 1 , id="No Parameters, Exe_Arg_Param or Replicas"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 1, 1, 1 , id="Set max permutation to lowest"), + pytest.param(_2x2_PARAMS, _2x2_EXE_ARG, 6, 2, 12 , id="Set max permutation, set replicas"), + pytest.param( {}, _2x2_EXE_ARG, 6, 2, 8 , id="Set params as dict, set max permutations and replicas"), + pytest.param(_2x2_PARAMS, {}, 6, 2, 8 , id="Set params as dict, set max permutations and replicas"), + pytest.param( {}, {}, 6, 2, 2 , id="Set params as dict, set max permutations and replicas") +)) +# fmt: on +def test_random_strategy( + # Parameterized + params, + exe_arg_params, + max_perms, + replicas, + expected_num_jobs, + # Other fixtures + mock_launcher_settings, +): + jobs = Ensemble( + "test_ensemble", + "echo", + ("hello", "world"), + file_parameters=params, + exe_arg_parameters=exe_arg_params, + permutation_strategy="random", + max_permutations=max_perms, + replicas=replicas, + ).as_jobs(mock_launcher_settings) + assert len(jobs) == expected_num_jobs diff --git a/tests/test_permutation_strategies.py b/tests/test_permutation_strategies.py new file mode 100644 index 0000000000..b14514c99b --- /dev/null +++ b/tests/test_permutation_strategies.py @@ -0,0 +1,203 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import dataclasses + +import pytest + +from smartsim.entity import strategies +from smartsim.entity.strategies import ParamSet +from smartsim.error import errors + +pytestmark = pytest.mark.group_a + + +def test_strategy_registration(monkeypatch): + monkeypatch.setattr(strategies, "_REGISTERED_STRATEGIES", {}) + assert strategies._REGISTERED_STRATEGIES == {} + + new_strat = lambda params, exe_args, nmax: [] + decorator = strategies._register("new_strat") + assert strategies._REGISTERED_STRATEGIES == {} + + ret_val = decorator(new_strat) + assert ret_val is new_strat + assert strategies._REGISTERED_STRATEGIES == {"new_strat": new_strat} + + +def test_strategies_cannot_be_overwritten(monkeypatch): + monkeypatch.setattr( + strategies, + "_REGISTERED_STRATEGIES", + {"some-strategy": lambda params, exe_args, nmax: []}, + ) + with pytest.raises(ValueError): + strategies._register("some-strategy")(lambda params, exe_args, nmax: []) + + +def test_strategy_retreval(monkeypatch): + new_strat_a = lambda params, exe_args, nmax: [] + new_strat_b = lambda params, exe_args, nmax: [] + + monkeypatch.setattr( + strategies, + "_REGISTERED_STRATEGIES", + {"new_strat_a": new_strat_a, "new_strat_b": new_strat_b}, + ) + assert strategies.resolve("new_strat_a") == new_strat_a + assert strategies.resolve("new_strat_b") == new_strat_b + + +def test_user_strategy_error_raised_when_attempting_to_get_unknown_strat(): + with pytest.raises(ValueError): + strategies.resolve("NOT-REGISTERED") + + +def broken_strategy(p, n, e): + raise Exception("This custom strategy raised an error") + + +@pytest.mark.parametrize( + "strategy", + ( + pytest.param(broken_strategy, id="Strategy raises during execution"), + pytest.param(lambda params, exe_args, nmax: 123, id="Does not return a list"), + pytest.param( + lambda params, exe_args, nmax: [1, 2, 3], + id="Does not return a list of ParamSet", + ), + ), +) +def test_custom_strategy_raises_user_strategy_error_if_something_goes_wrong(strategy): + with pytest.raises(errors.UserStrategyError): + strategies.resolve(strategy)({"SPAM": ["EGGS"]}, {"HELLO": [["WORLD"]]}, 123) + + +@pytest.mark.parametrize( + "strategy, expected_output", + ( + pytest.param( + strategies.create_all_permutations, + ( + [ + ParamSet( + params={"SPAM": "a", "EGGS": "c"}, exe_args={"EXE": ["a"]} + ), + ParamSet( + params={"SPAM": "a", "EGGS": "c"}, + exe_args={"EXE": ["b", "c"]}, + ), + ParamSet( + params={"SPAM": "a", "EGGS": "d"}, exe_args={"EXE": ["a"]} + ), + ParamSet( + params={"SPAM": "a", "EGGS": "d"}, + exe_args={"EXE": ["b", "c"]}, + ), + ParamSet( + params={"SPAM": "b", "EGGS": "c"}, exe_args={"EXE": ["a"]} + ), + ParamSet( + params={"SPAM": "b", "EGGS": "c"}, + exe_args={"EXE": ["b", "c"]}, + ), + ParamSet( + params={"SPAM": "b", "EGGS": "d"}, exe_args={"EXE": ["a"]} + ), + ParamSet( + params={"SPAM": "b", "EGGS": "d"}, + exe_args={"EXE": ["b", "c"]}, + ), + ] + ), + id="All Permutations", + ), + pytest.param( + strategies.step_values, + ( + [ + ParamSet( + params={"SPAM": "a", "EGGS": "c"}, exe_args={"EXE": ["a"]} + ), + ParamSet( + params={"SPAM": "b", "EGGS": "d"}, + exe_args={"EXE": ["b", "c"]}, + ), + ] + ), + id="Step Values", + ), + pytest.param( + strategies.random_permutations, + ( + [ + ParamSet( + params={"SPAM": "a", "EGGS": "c"}, exe_args={"EXE": ["a"]} + ), + ParamSet( + params={"SPAM": "a", "EGGS": "c"}, + exe_args={"EXE": ["b", "c"]}, + ), + ParamSet( + params={"SPAM": "a", "EGGS": "d"}, exe_args={"EXE": ["a"]} + ), + ParamSet( + params={"SPAM": "a", "EGGS": "d"}, + exe_args={"EXE": ["b", "c"]}, + ), + ParamSet( + params={"SPAM": "b", "EGGS": "c"}, exe_args={"EXE": ["a"]} + ), + ParamSet( + params={"SPAM": "b", "EGGS": "c"}, + exe_args={"EXE": ["b", "c"]}, + ), + ParamSet( + params={"SPAM": "b", "EGGS": "d"}, exe_args={"EXE": ["a"]} + ), + ParamSet( + params={"SPAM": "b", "EGGS": "d"}, + exe_args={"EXE": ["b", "c"]}, + ), + ] + ), + id="Uncapped Random Permutations", + ), + ), +) +def test_strategy_returns_expected_set(strategy, expected_output): + params = {"SPAM": ["a", "b"], "EGGS": ["c", "d"]} + exe_args = {"EXE": [["a"], ["b", "c"]]} + output = list(strategy(params, exe_args, 50)) + assert len(output) == len(expected_output) + assert all(item in expected_output for item in output) + assert all(item in output for item in expected_output) + + +def test_param_set_is_frozen(): + param = ParamSet("set1", "set2") + with pytest.raises(dataclasses.FrozenInstanceError): + param.exe_args = "change"