From a4cf1336f1b80ebd82381d4ffd087d124a25e223 Mon Sep 17 00:00:00 2001 From: Satish Pasumarthi Date: Thu, 7 Jul 2022 19:19:37 -0700 Subject: [PATCH] feature: Heterogeneous cluster changes --- CODE_OF_CONDUCT.md | 4 +- setup.py | 2 +- src/sagemaker_training/environment.py | 246 +++++++++++++++++++++-- src/sagemaker_training/params.py | 1 + src/sagemaker_training/runner.py | 4 +- src/sagemaker_training/smdataparallel.py | 3 + src/sagemaker_training/trainer.py | 6 +- test/__init__.py | 19 +- test/unit/test_environment.py | 24 ++- test/unit/test_smdataparallel.py | 116 +++++++++++ test/unit/test_trainer.py | 2 + 11 files changed, 394 insertions(+), 33 deletions(-) diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 3b6446687..5b627cfa6 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -1,4 +1,4 @@ ## Code of Conduct -This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). -For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact +This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct). +For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact opensource-codeofconduct@amazon.com with any additional questions or comments. diff --git a/setup.py b/setup.py index c954f92d4..cc0e6bd7b 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ def read_version(): "werkzeug>=0.15.5", "paramiko>=2.4.2", "psutil>=5.6.7", - "protobuf>=3.19,<3.20", + "protobuf>=3.9.2,<3.20", "scipy>=1.2.2", ] diff --git a/src/sagemaker_training/environment.py b/src/sagemaker_training/environment.py index 899e99414..8069e84c7 100644 --- a/src/sagemaker_training/environment.py +++ b/src/sagemaker_training/environment.py @@ -161,8 +161,18 @@ def _create_training_directories(): _write_json({}, input_data_config_file_dir) host_name = socket.gethostname() - - resources_dict = {"current_host": host_name, "hosts": [host_name]} + resources_dict = { + "current_host": host_name, + "hosts": [host_name], + "current_instance_group": "homogeneousCluster", + "instance_groups": [ + { + "instance_group_name": "homogeneousCluster", + "instance_type": "local", + "hosts": [host_name], + } + ], + } _write_json(resources_dict, resource_config_file_dir) @@ -232,9 +242,14 @@ def read_resource_config(): # type: () -> dict It has the following keys: - current_host: The name of the current container on the container network. For example, 'algo-1'. - - hosts: The list of names of all containers on the container + - current_instance_type: Type of EC2 instance + - hosts: The list of names of all nodes on the container network, sorted lexicographically. For example, `['algo-1', 'algo-2', 'algo-3']` for a three-node cluster. + - current_instance_group: Name of the current instance group + - instance_groups: List of instance group dicts containing info about + instance_type, hosts list and group name + - network_interface_name: Name of network interface exposed to container """ return _read_json(resource_config_file_dir) @@ -383,6 +398,10 @@ class Environment(mapping.MappingMixin): # pylint:disable=too-many-public-metho - hosts: The list of names of all containers on the container network, sorted lexicographically. For example, `['algo-1', 'algo-2', 'algo-3']` for a three-node cluster. + - current_instance_group: Name of the current instance group + - instance_groups: List of instance group dicts containing info about + instance_type, hosts list and group name + - network_interface_name: Name of network interface exposed to container input_data_config (dict[string, object]): the contents from /opt/ml/input/config/inputdataconfig.json. For example, suppose that you specify three data channels (train, evaluation, and @@ -447,11 +466,16 @@ def __init__(self, resource_config=None, input_data_config=None, hyperparameters resource_config (dict[string, object]): The contents from /opt/ml/input/config/resourceconfig.json. It has the following keys: - - current_host: The name of the current container on the container network. - For example, 'algo-1'. - - hosts: The list of names of all containers on the container network, - sorted lexicographically. For example, `['algo-1', 'algo-2', 'algo-3']` - for a three-node cluster. + - current_host: The name of the current container on the container + network. For example, 'algo-1'. + - current_instance_type: Type of EC2 instance + - hosts: The list of names of all nodes on the container + network, sorted lexicographically. For example, + `['algo-1', 'algo-2', 'algo-3']` for a three-node cluster. + - current_instance_group: Name of the current instance group + - instance_groups: List of instance group dicts containing info about + instance_type, hosts list and group name + - network_interface_name: Name of network interface exposed to container input_data_config (dict[string, object]): The contents from /opt/ml/input/config/inputdataconfig.json. For example, suppose that you specify three data channels (train, evaluation, and @@ -486,7 +510,6 @@ def __init__(self, resource_config=None, input_data_config=None, hyperparameters module_dir = os.environ.get(params.SUBMIT_DIR_ENV, code_dir) log_level = int(os.environ.get(params.LOG_LEVEL_ENV, logging.INFO)) - self._current_host = current_host self._num_gpus = num_gpus() self._num_cpus = num_cpus() self._module_name = module_name @@ -499,8 +522,14 @@ def __init__(self, resource_config=None, input_data_config=None, hyperparameters input_data_config = input_data_config or read_input_data_config() all_hyperparameters = hyperparameters or read_hyperparameters() - current_host = resource_config["current_host"] hosts = resource_config["hosts"] + current_instance_type = resource_config.get("current_instance_type", "local") + current_instance_group = resource_config.get("current_group_name", "homogeneousCluster") + current_host = resource_config["current_host"] + + self._current_host = current_host + self._current_instance_type = current_instance_type + self._current_instance_group = current_instance_group split_result = mapping.split_by_criteria( all_hyperparameters, @@ -523,6 +552,7 @@ def __init__(self, resource_config=None, input_data_config=None, hyperparameters os.environ[params.CURRENT_HOST_ENV] = current_host os.environ[params.REGION_NAME_ENV] = sagemaker_region or "" + # hosts comprises of instances from all the groups self._hosts = hosts # eth0 is the default network interface defined by SageMaker with VPC support and @@ -538,7 +568,6 @@ def __init__(self, resource_config=None, input_data_config=None, hyperparameters self._output_data_dir = output_data_dir self._output_intermediate_dir = output_intermediate_dir self._channel_input_dirs = {channel: channel_path(channel) for channel in input_data_config} - self._current_host = current_host # override base class attributes if self._module_name is None: @@ -559,12 +588,92 @@ def __init__(self, resource_config=None, input_data_config=None, hyperparameters self._output_dir = output_dir self._job_name = os.environ.get(params.TRAINING_JOB_ENV.upper(), None) - self._master_hostname = list(hosts)[0] + # Heterogeneous cluster changes - get the instance group related information + current_instance_group_hosts = self.get_current_instance_group_hosts() + instance_groups = self.get_instance_groups() + instance_groups_dict = self.get_instance_groups_dict() + distribution_instance_groups = self._additional_framework_parameters.get( + "sagemaker_distribution_instance_groups", + self.get_distribution_instance_groups_from_resource_config(), + ) + self._distribution_instance_groups = distribution_instance_groups + distribution_hosts = self.get_distribution_hosts() + + self._current_instance_group_hosts = current_instance_group_hosts + self._instance_groups = instance_groups + self._instance_groups_dict = instance_groups_dict + self._distribution_hosts = distribution_hosts + is_hetero = bool(len(self._instance_groups) > 1) + self._is_hetero = is_hetero + master_hostname = self.get_master_hostname() + self._master_hostname = master_hostname self._is_master = current_host == self._master_hostname + self._distribution_enabled = bool( + self._current_instance_group in self._distribution_instance_groups + ) mp_parameters = os.environ.get(params.SM_HP_MP_PARAMETERS) self._is_modelparallel_enabled = mp_parameters and mp_parameters != "{}" + @property + def current_instance_type(self): + """ + Return current instance type + """ + return self._current_instance_type + + @property + def is_hetero(self): + """ + Return if current mode is hetero + """ + return self._is_hetero + + @property + def current_instance_group(self): + """ + Return name of the current instance group + """ + return self._current_instance_group + + @property + def instance_groups(self): + """ + Return list of all instance groups + """ + return self._instance_groups + + @property + def instance_groups_dict(self): + """ + Return dict of all instance groups + """ + return self._instance_groups_dict + + @property + def current_instance_group_hosts(self): + """ + Return hosts in the current instance group + """ + return self._current_instance_group_hosts + + @property + def distribution_hosts(self): + """ + Return list of hosts on which distribution will be applied + """ + return self._distribution_hosts + + @property + def distribution_instance_groups(self): + """Return list of instance groups which have distribution""" + return self._distribution_instance_groups + + @property + def master_hostname(self): + """Return master hostname""" + return self._master_hostname + @property def model_dir(self): # type: () -> str """The directory where models should be saved. @@ -657,11 +766,6 @@ def is_master(self): # type: () -> bool """Returns True if host is master.""" return self._is_master - @property - def master_hostname(self): # type: () -> str - """Returns the hostname of the master node.""" - return self._master_hostname - @property def job_name(self): # type: () -> str """The name of the current training job. @@ -683,6 +787,94 @@ def additional_framework_parameters(self): # type: () -> dict """ return self._additional_framework_parameters + def get_distribution_instance_groups_from_resource_config(self): + """If non heterogeneous cluster mode is used, instance_groups inside distribution is a noop + We populate the sagemaker_distribution_instance_groups with current instance group name ~ + homogeneousCluster + """ + distribution_instance_groups = [] + current_instance_group = self.resource_config.get( + "current_group_name", "homogeneousCluster" + ) + if ( + self._additional_framework_parameters.get("sagemaker_mpi_enabled", False) + or self._additional_framework_parameters.get( + "sagemaker_parameter_server_enabled", False + ) + or self._additional_framework_parameters.get( + "sagemaker_distributed_dataparallel_enabled", False + ) + or self._additional_framework_parameters.get( + "sagemaker_multi_worker_mirrored_strategy_enabled", False + ) + ): + distribution_instance_groups.append(current_instance_group) + return distribution_instance_groups + + def get_current_instance_group(self): + """ + Get the current instance group name + """ + return self.resource_config["current_instance_group"] + + def get_distribution_hosts(self): + """ + Get the list of all hosts in all distribution instance groups + """ + distribution_hosts = [] + instance_groups_config = self._resource_config.get("instance_groups", []) + if instance_groups_config: + for group in instance_groups_config: + if group["instance_group_name"] in self._distribution_instance_groups: + distribution_hosts.extend(group["hosts"]) + else: + # local mode + distribution_hosts = self.hosts.copy() + return distribution_hosts + + def get_current_instance_group_hosts(self): + """ + Get the list of hosts in the current instance group + """ + instance_groups_config = self._resource_config.get("instance_groups", []) + for group in instance_groups_config: + if self._current_instance_group == group["instance_group_name"]: + return group["hosts"] + return [] + + def get_instance_groups(self): # type: () -> list + """ + List of instance groups provided for the job + """ + instance_groups = [] + instance_groups_config = self._resource_config.get("instance_groups", []) + # log missing instance groups and return empty list + if not instance_groups_config: + logger.info("instance_groups entry not present in resource_config") + + for group in instance_groups_config: + instance_groups.append(group["instance_group_name"]) + return instance_groups + + def get_instance_groups_dict(self): + """ + Dictionaty of instance groups with group_names as keys + """ + instance_groups_dict = {} + instance_groups_config = self._resource_config.get("instance_groups", []) + for group in instance_groups_config: + instance_groups_dict[group["instance_group_name"]] = group + return instance_groups_dict + + def get_master_hostname(self): + """ + Get the master hostname from the list of hosts in the distribution instance groups + """ + if self._distribution_hosts: + return list(self._distribution_hosts)[0] + # if no distribution found + return list(self._hosts)[0] + def sagemaker_s3_output(self): # type: () -> str """S3 output directory location provided by the user. @@ -717,6 +909,13 @@ def to_env_vars(self): "output_data_dir": self.output_data_dir, "channels": sorted(self.channel_input_dirs.keys()), "current_host": self.current_host, + "current_instance_type": self.current_instance_type, + "current_instance_group": self.current_instance_group, + "current_instance_group_hosts": self.current_instance_group_hosts, + "instance_groups": self.instance_groups, + "instance_groups_dict": self.instance_groups_dict, + "distribution_instance_groups": self.distribution_instance_groups, + "is_hetero": self.is_hetero, "module_name": self.module_name, "log_level": self.log_level, "framework_module": self.framework_module, @@ -836,10 +1035,15 @@ def resource_config(self): # type: () -> dict It has the following keys: - current_host: The name of the current container on the container - network. For example, 'algo-1'. - - hosts: The list of names of all containers on the container network, - sorted lexicographically. For example, - `["algo-1", "algo-2", "algo-3"]` for a three-node cluster. + network. For example, 'algo-1'. + - current_instance_type: Type of EC2 instance + - hosts: The list of names of all nodes on the container + network, sorted lexicographically. For example, + `['algo-1', 'algo-2', 'algo-3']` for a three-node cluster. + - current_instance_group: Name of the current instance group + - instance_groups: List of instance group dicts containing info about + instance_type, hosts list and group name + - network_interface_name: Name of network interface exposed to container Returns: dict[str, str or list(str)] diff --git a/src/sagemaker_training/params.py b/src/sagemaker_training/params.py index 54808c004..f4e437e7e 100644 --- a/src/sagemaker_training/params.py +++ b/src/sagemaker_training/params.py @@ -63,3 +63,4 @@ "sagemaker_distributed_dataparallel_custom_mpi_options" ) # type: str SM_HP_MP_PARAMETERS = "SM_HP_MP_PARAMETERS" +DISTRIBUTION_INSTANCE_GROUPS = "sagemaker_distribution_instance_groups" # type: list diff --git a/src/sagemaker_training/runner.py b/src/sagemaker_training/runner.py index 9af4f4e8f..c44f57eb8 100644 --- a/src/sagemaker_training/runner.py +++ b/src/sagemaker_training/runner.py @@ -77,7 +77,7 @@ def _get_by_runner_type( env_vars, processes_per_host, env.master_hostname, - env.hosts, + env.distribution_hosts, custom_mpi_options, env.network_interface_name, ) @@ -94,7 +94,7 @@ def _get_by_runner_type( env_vars, processes_per_host, env.master_hostname, - env.hosts, + env.distribution_hosts, custom_mpi_options, env.network_interface_name, num_processes=num_processes, diff --git a/src/sagemaker_training/smdataparallel.py b/src/sagemaker_training/smdataparallel.py index de5d08d97..e40b136ca 100644 --- a/src/sagemaker_training/smdataparallel.py +++ b/src/sagemaker_training/smdataparallel.py @@ -208,6 +208,9 @@ def _get_instance_type(self): instance_type = sm_training_env.get("additional_framework_parameters").get( "sagemaker_instance_type" ) + if not instance_type: + # Heterogeneous mode + instance_type = sm_training_env.get("current_instance_type", None) logger.info("instance type: %s" % instance_type) return instance_type diff --git a/src/sagemaker_training/trainer.py b/src/sagemaker_training/trainer.py index 14fd3083b..62c450daa 100644 --- a/src/sagemaker_training/trainer.py +++ b/src/sagemaker_training/trainer.py @@ -89,7 +89,11 @@ def train(): logging_config.configure_logger(env.log_level) mpi_enabled = env.additional_framework_parameters.get(params.MPI_ENABLED) - runner_type = runner.RunnerType.MPI if mpi_enabled else runner.RunnerType.Process + runner_type = ( + runner.RunnerType.MPI + if mpi_enabled and (env.current_instance_group in env.distribution_instance_groups) + else runner.RunnerType.Process + ) entry_point.run( env.module_dir, diff --git a/test/__init__.py b/test/__init__.py index cf479dbab..512c62329 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -77,16 +77,17 @@ def prepare( current_host="algo-1", hosts=None, network_interface_name="ethwe", + current_instance_group="homogeneousCluster", local=False, ): - # type: (UserModule, dict, list, str, list, str, bool) -> None + # type: (UserModule, dict, list, str, list, str, str, bool) -> None hosts = hosts or ["algo-1"] if not local: user_module.upload() create_hyperparameters_config(hyperparameters, user_module.url) - create_resource_config(current_host, hosts, network_interface_name) + create_resource_config(current_host, hosts, current_instance_group, network_interface_name) create_input_data_config(channels) @@ -98,21 +99,29 @@ def hyperparameters(**kwargs): # type: (...) -> dict def create_resource_config( - current_host="algo-1", hosts=None, network_interface_name="ethwe" -): # type: (str, list, str) -> None + current_host="algo-1", + hosts=None, + current_instance_group="homogeneousCluster", + network_interface_name="ethwe", +): # type: (str, list, str, str) -> None if network_interface_name: write_json( dict( current_host=current_host, hosts=hosts or ["algo-1"], + current_instance_group=current_instance_group, network_interface_name=network_interface_name, ), environment.resource_config_file_dir, ) else: write_json( - dict(current_host=current_host, hosts=hosts or ["algo-1"]), + dict( + current_host=current_host, + current_instance_group=current_instance_group, + hosts=hosts or ["algo-1"], + ), environment.resource_config_file_dir, ) diff --git a/test/unit/test_environment.py b/test/unit/test_environment.py index 4c6ca50a8..54c12ba8e 100644 --- a/test/unit/test_environment.py +++ b/test/unit/test_environment.py @@ -25,7 +25,18 @@ builtins_open = "__builtin__.open" if six.PY2 else "builtins.open" -RESOURCE_CONFIG = dict(current_host="algo-1", hosts=["algo-1", "algo-2", "algo-3"]) +RESOURCE_CONFIG = dict( + current_host="algo-1", + hosts=["algo-1", "algo-2", "algo-3"], + current_group_name="train1", + current_instance_type="ml.p3.16xlarge", + instance_groups=[ + dict( + instance_group_name="train1", instance_type="ml.p3.16xlarge", hosts=["algo-1", "algo-2"] + ), + dict(instance_group_name="train2", instance_type="ml.p3.8xlarge", hosts=["algo-3"]), + ], +) INPUT_DATA_CONFIG = { "train": { @@ -184,6 +195,9 @@ def test_training_env(training_env): assert training_env.network_interface_name == "eth0" assert training_env.job_name == "training-job-42" assert training_env.additional_framework_parameters == {"sagemaker_parameter_server_num": 2} + assert training_env.current_instance_group == "train1" + assert training_env.current_instance_type == "ml.p3.16xlarge" + assert training_env.instance_groups == ["train1", "train2"] def test_env_mapping_properties(training_env): @@ -213,6 +227,14 @@ def test_env_mapping_properties(training_env): "is_master", "master_hostname", "is_modelparallel_enabled", + "instance_groups", + "instance_groups_dict", + "current_instance_type", + "current_instance_group", + "current_instance_group_hosts", + "distribution_hosts", + "distribution_instance_groups", + "is_hetero", } diff --git a/test/unit/test_smdataparallel.py b/test/unit/test_smdataparallel.py index cdc9db124..e8344cb06 100644 --- a/test/unit/test_smdataparallel.py +++ b/test/unit/test_smdataparallel.py @@ -270,6 +270,122 @@ def test_smdataparallel_run_single_node_python( path_exists.assert_called_with("/usr/sbin/sshd") +@patch("asyncio.gather", new_callable=AsyncMock) +@patch("os.path.exists") +@patch("sagemaker_training.process.python_executable", return_value="usr/bin/python3") +@patch("paramiko.SSHClient", new_callable=MockSSHClient) +@patch("paramiko.AutoAddPolicy") +@patch("asyncio.create_subprocess_shell") +@patch("sagemaker_training.environment.Environment") +def test_hc_smdataparallel_run_single_node_python( + training_env, + async_shell, + policy, + ssh_client, + python_executable, + path_exists, + async_gather, + event_loop, +): + with patch.dict(os.environ, clear=True): + hosts = ["algo-1"] + master_hostname = hosts[0] + num_hosts = len(hosts) + num_processes_per_host = 8 + num_processes = num_processes_per_host * num_hosts + host_list = hosts + network_interface_name = "ethw3" + smdataparallel_flag = "SMDATAPARALLEL_USE_SINGLENODE=1" + + smdataparallel_runner = smdataparallel.SMDataParallelRunner( + user_entry_point="train.py", + args=["-v", "--lr", "35"], + env_vars={ + "SM_TRAINING_ENV": '{"additional_framework_parameters":{"sagemaker_distributed_dataparallel_enabled":"true"},\ + "current_instance_type": "ml.p4d.24xlarge"}' + }, + processes_per_host=num_processes_per_host, + master_hostname=master_hostname, + hosts=hosts, + custom_mpi_options="--verbose", + network_interface_name=network_interface_name, + ) + + _, _, process = smdataparallel_runner.run(wait=False) + cmd = [ + "mpirun", + "--host", + ",".join(host_list), + "-np", + str(num_processes), + "--allow-run-as-root", + "--tag-output", + "--oversubscribe", + "-mca", + "btl_tcp_if_include", + network_interface_name, + "-mca", + "oob_tcp_if_include", + network_interface_name, + "-mca", + "plm_rsh_no_tree_spawn", + "1", + "-mca", + "pml", + "ob1", + "-mca", + "btl", + "^openib", + "-mca", + "orte_abort_on_non_zero_status", + "1", + "-mca", + "btl_vader_single_copy_mechanism", + "none", + "-mca", + "plm_rsh_num_concurrent", + str(num_hosts), + "-x", + "NCCL_SOCKET_IFNAME=%s" % network_interface_name, + "-x", + "NCCL_DEBUG=INFO", + "-x", + "LD_LIBRARY_PATH", + "-x", + "PATH", + "-x", + smdataparallel_flag, + "-x", + "FI_PROVIDER=efa", + "-x", + "RDMAV_FORK_SAFE=1", + "-x", + "LD_PRELOAD=%s" % inspect.getfile(gethostname), + "--verbose", + "-x", + "FI_EFA_USE_DEVICE_RDMA=1", + "smddprun", + "usr/bin/python3", + "-m", + "mpi4py", + "train.py", + "-v", + "--lr", + "35", + ] + async_shell.assert_called_with( + " ".join(cmd), + cwd=environment.code_dir, + env=ANY, + stdout=asyncio.subprocess.PIPE, + stderr=None, + ) + async_shell.assert_called_once() + async_gather.assert_called_once() + assert process == async_shell.return_value + path_exists.assert_called_with("/usr/sbin/sshd") + + @patch("sagemaker_training.logging_config.log_script_invocation") def test_connection(log): with pytest.raises(Exception): diff --git a/test/unit/test_trainer.py b/test/unit/test_trainer.py index 59d0383a7..da0bf752b 100644 --- a/test/unit/test_trainer.py +++ b/test/unit/test_trainer.py @@ -35,6 +35,8 @@ def sagemaker_s3_output(self): class ScriptEnvironment(Environment): framework_module = None + current_instance_group = "Test1" + distribution_instance_groups = ["Test1"] def sagemaker_s3_output(self): return "s3://bucket"