From 9abe28e650440624e15cf0e0a68f008c1d21270b Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 3 Dec 2020 18:25:52 -0500 Subject: [PATCH 01/12] add ClusterEnvironment for LSF systems --- .../cluster_environments/lsf_environment.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 pytorch_lightning/cluster_environments/lsf_environment.py diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/cluster_environments/lsf_environment.py new file mode 100644 index 0000000000000..2b66e0b964cec --- /dev/null +++ b/pytorch_lightning/cluster_environments/lsf_environment.py @@ -0,0 +1,71 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +from pytorch_lightning import _logger as log +from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment + + +class LSFEnvironment(ClusterEnvironment): + + def __init__(self): + super().__init__() + + def master_address(self): + # figure out the root node addr + try: + root_node = sorted(set(x for x in open(os.environ['LSB_DJOB_HOSTFILE'], 'r') + if 'batch' not in x and 'login' not in x))[0][:-1] + except Exception: + root_node = "127.0.0.1" + + os.environ["MASTER_ADDR"] = root_node + log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") + return root_node + + def master_port(self): + # ----------------------- + # LSF JOB = PORT number + # ----------------------- + # this way every process knows what port to use + try: + # use the last 4 numbers in the job id as the id + default_port = os.environ["LSB_JOB_ID"] + default_port = default_port[-4:] + + # all ports should be in the 10k+ range + default_port = int(default_port) + 15000 + + except Exception: + default_port = 12910 + + # ----------------------- + # PORT NUMBER = MASTER_PORT + # ----------------------- + # in case the user passed it in + try: + default_port = os.environ["MASTER_PORT"] + except Exception: + os.environ["MASTER_PORT"] = str(default_port) + + log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") + + return default_port + + def world_size(self): + return self._world_size + + def local_rank(self): + return int(os.environ['JSM_NAMESPACE_LOCAL_RANK']) From f2e44c1c4b89a2e71a50adeeb454d0e108f08f77 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 3 Dec 2020 19:34:16 -0500 Subject: [PATCH 02/12] update init file --- pytorch_lightning/cluster_environments/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pytorch_lightning/cluster_environments/__init__.py b/pytorch_lightning/cluster_environments/__init__.py index 5f03ec07fe311..72f3f46e7715a 100644 --- a/pytorch_lightning/cluster_environments/__init__.py +++ b/pytorch_lightning/cluster_environments/__init__.py @@ -13,4 +13,5 @@ # limitations under the License. from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment from pytorch_lightning.cluster_environments.slurm_environment import SLURMEnvironment +from pytorch_lightning.cluster_environments.lsf_environment import LSFEnvironment from pytorch_lightning.cluster_environments.torchelastic_environment import TorchElasticEnvironment From 615f08dddf7e5f2be3b038a619630ca1e62ac025 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 3 Dec 2020 19:43:56 -0500 Subject: [PATCH 03/12] add available cluster environments --- docs/source/accelerators.rst | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/source/accelerators.rst b/docs/source/accelerators.rst index ee801f2dee28b..629a43d014e8e 100644 --- a/docs/source/accelerators.rst +++ b/docs/source/accelerators.rst @@ -180,3 +180,27 @@ TPU Accelerator .. autoclass:: pytorch_lightning.accelerators.tpu_accelerator.TPUAccelerator :noindex: + +------------ + +***************************** +Available ClusterEnvironments +***************************** + +SLURM Environment +================= + +.. autoclass:: pytorch_lightning.cluster_environments.slurm_environment.SLURMEnvironment + :noindex + +LSF Environment +=============== + +.. autoclass:: pytorch_lightning.cluster_environments.lsf_environment.LSFEnvironment + :noindex + +TorchElastic Environment +======================== + +.. autoclass:: pytorch_lightning.cluster_environments.torchelastic_environment.TorchElasticEnvironment + :noindex From 86f2fa18be694bb85bfbe8b5869427d50c880e4c Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 17:21:28 -0500 Subject: [PATCH 04/12] clean up LSFEnvironment --- .../cluster_environments/lsf_environment.py | 139 +++++++++++++----- 1 file changed, 101 insertions(+), 38 deletions(-) diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/cluster_environments/lsf_environment.py index 2b66e0b964cec..e83177f213cb8 100644 --- a/pytorch_lightning/cluster_environments/lsf_environment.py +++ b/pytorch_lightning/cluster_environments/lsf_environment.py @@ -14,58 +14,121 @@ import os import re +import warnings from pytorch_lightning import _logger as log from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment class LSFEnvironment(ClusterEnvironment): + """An environment for running on clusters managed by the LSF resource manager. + + It is expected that any execution using this ClusterEnvironment was executed + using the Job Step Manager i.e. jsrun. + + This plugin expects the following environment variables: + + LSB_JOBID + The LSF assigned job ID + + LSB_HOSTS + The hosts used in the job. This string is expected to have the format "batch ...." + + JSM_NAMESPACE_LOCAL_RANK + The node local rank for the task. This environment variable is set by jsrun + + JSM_NAMESPACE_SIZE + The world size for the task. This environment variable is set by jsrun + """ def __init__(self): - super().__init__() + self._master_address = self._get_master_address() + self._master_port = self._get_master_port() + self._local_rank = self._get_local_rank() + self._world_size = self._get_world_size() - def master_address(self): - # figure out the root node addr - try: - root_node = sorted(set(x for x in open(os.environ['LSB_DJOB_HOSTFILE'], 'r') - if 'batch' not in x and 'login' not in x))[0][:-1] - except Exception: - root_node = "127.0.0.1" - - os.environ["MASTER_ADDR"] = root_node + # set environment variables needed for initializing torch distributed process group + os.environ["MASTER_ADDR"] = str(self._master_address) log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") - return root_node + os.environ["MASTER_PORT"] = str(self._master_port) + log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") - def master_port(self): - # ----------------------- - # LSF JOB = PORT number - # ----------------------- - # this way every process knows what port to use - try: - # use the last 4 numbers in the job id as the id - default_port = os.environ["LSB_JOB_ID"] - default_port = default_port[-4:] - - # all ports should be in the 10k+ range - default_port = int(default_port) + 15000 - - except Exception: - default_port = 12910 - - # ----------------------- - # PORT NUMBER = MASTER_PORT - # ----------------------- - # in case the user passed it in - try: - default_port = os.environ["MASTER_PORT"] - except Exception: - os.environ["MASTER_PORT"] = str(default_port) + def _get_master_address(self): + """A helper for getting the master address""" + var = "LSB_HOSTS" + hosts = os.environ.get(var) + if not hosts: + raise ValueError("Could not find hosts -- expected in environment variable %s" % var) + hosts = hosts.split() + if len(hosts) < 2: + raise ValueError("Cannot parse hosts from LSB_HOSTS environment variable -- " + "expected format \"batch ...\"") + return hosts[1] - log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") + def _get_master_port(self): + """A helper for getting the master port + + Use the LSF job ID so all ranks can compute the master port + """ + # check for user-specified master port + port = os.environ.get("MASTER_PORT") + if not port: + var = "LSB_JOBID" + jobid = os.environ.get(var) + if not jobid: + raise ValueError("Could not find job id -- expected in environment variable %s" % var) + else: + port = int(jobid) + # all ports should be in the 10k+ range + port = int(port) % 1000 + 10000 + log.debug("calculated master port") + else: + log.debug("using externally specified master port") + return port + + def _get_local_rank(self): + """A helper function for getting the local rank + + Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + var = "JSM_NAMESPACE_LOCAL_RANK" + local_rank = os.environ.get(var) + if local_rank is None: + raise ValueError("Cannot determine local rank -- expected in %s " + "-- make sure you run your executable with jsrun" % var) + return int(local_rank) + + def _get_world_size(self): + """A helper function for getting the world size - return default_port + Read this from the environment variable JSM_NAMESPACE_SIZE + """ + var = "JSM_NAMESPACE_SIZE" + world_size = os.environ.get(var) + if world_size is None: + raise ValueError("Cannot determine local rank -- expected in %s " + "-- make sure you run your executable with jsrun" % var) + return int(world_size) + + def master_address(self): + """ + Master address is read from a list of hosts contained in the environment variable *LSB_HOSTS* + """ + return self._master_address + + def master_port(self): + """ + Master port is calculated from the LSF job ID + """ + return self._master_port def world_size(self): + """ + World size is read from the environment variable JSM_NAMESPACE_SIZE + """ return self._world_size def local_rank(self): - return int(os.environ['JSM_NAMESPACE_LOCAL_RANK']) + """ + World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + return self._local_rank From b72b42d28f32ae532a55b46831e369b7b4e0f76d Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 17:23:37 -0500 Subject: [PATCH 05/12] add ddp_hpc as a distributed backend --- pytorch_lightning/accelerators/accelerator_connector.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index a22a8fb3702ee..0ccbaee7ba5e9 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -263,6 +263,13 @@ def select_accelerator(self): ddp_plugin=self.trainer.plugin_connector.ddp_plugin ) + elif self.trainer.distributed_backend == "ddp_hpc": + accelerator_backend = accelerators.DDPHPCAccelerator( + self.trainer, + cluster_env, + ddp_plugin=self.trainer.plugin_connector.ddp_plugin + ) + elif self.trainer.use_dp: accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) @@ -315,7 +322,7 @@ def set_distributed_mode(self): elif self.trainer.num_gpus > 1: self.trainer.use_dp = True - elif self.trainer.distributed_backend in ("ddp", "ddp_spawn"): + elif self.trainer.distributed_backend in ("ddp", "ddp_spawn", "ddp_hpc"): if self.trainer.num_gpus == 0: if self.trainer.num_nodes > 1 or self.trainer.num_processes > 1: self.trainer.use_ddp = True # ddp_cpu From 6a9a4ca80bf5355e0a0370a8c39710b26465e99c Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 17:25:04 -0500 Subject: [PATCH 06/12] clean up SLURMEnvironment --- pytorch_lightning/cluster_environments/slurm_environment.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pytorch_lightning/cluster_environments/slurm_environment.py b/pytorch_lightning/cluster_environments/slurm_environment.py index 6df1cf680c57f..aa78709e12b4e 100644 --- a/pytorch_lightning/cluster_environments/slurm_environment.py +++ b/pytorch_lightning/cluster_environments/slurm_environment.py @@ -20,9 +20,6 @@ class SLURMEnvironment(ClusterEnvironment): - def __init__(self): - super().__init__() - def master_address(self): # figure out the root node addr try: @@ -46,7 +43,7 @@ def master_port(self): default_port = default_port[-4:] # all ports should be in the 10k+ range - default_port = int(default_port) + 15000 + default_port = int(default_port) + 10000 except Exception: default_port = 12910 From 94e4d4b885f968eda3a001b299b1ae2d60ccd1ca Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 17:41:28 -0500 Subject: [PATCH 07/12] remove extra blank line --- pytorch_lightning/accelerators/accelerator.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index b5301dd686762..01b4d1abe5205 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -254,6 +254,3 @@ def block_ddp_plugin_sync_behaviour(self): """ cm = self.ddp_plugin.block_backward_sync(self.trainer.model) if self.ddp_plugin else None yield cm - - - From 113e787b3b003978d78fa694fbeb9e97f20eb9f1 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 21:19:12 -0500 Subject: [PATCH 08/12] init device for DDPHPCAccelerator We need to do this so we don't send the model to the same device from multiple ranks --- pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 5f09189e8b42c..58bc5a375d400 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -129,6 +129,9 @@ def ddp_train(self, process_idx, model): # set warning rank rank_zero_only.rank = self.trainer.global_rank + # Initialize cuda device + self.init_device(process_idx) + # set up server using proc 0's ip address # try to init for 20 times at max in case ports are taken # where to store ip_table From d12d65231c260d9263aa95a4ef4458e0694e17fb Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 14:25:27 -0500 Subject: [PATCH 09/12] committing current state --- pytorch_lightning/accelerators/accelerator.py | 5 +- .../accelerators/accelerator_connector.py | 227 +++++++++++------- .../accelerators/ddp2_accelerator.py | 3 +- .../accelerators/ddp_accelerator.py | 3 +- .../accelerators/ddp_cpu_hpc_accelerator.py | 2 +- .../accelerators/ddp_cpu_spawn_accelerator.py | 2 +- .../accelerators/ddp_hpc_accelerator.py | 13 +- .../cluster_environments/lsf_environment.py | 46 +++- .../cluster_environments/slurm_environment.py | 3 + .../torchelastic_environment.py | 7 +- pytorch_lightning/plugins/ddp_plugin.py | 29 ++- .../plugins/ddp_sequential_plugin.py | 4 +- pytorch_lightning/plugins/plugin_connector.py | 4 +- pytorch_lightning/trainer/trainer.py | 16 +- 14 files changed, 253 insertions(+), 111 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index 01b4d1abe5205..de6a27601374f 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -144,14 +144,13 @@ def setup_optimizers(self, model): self.trainer.optimizer_frequencies = optimizer_frequencies def init_ddp_connection( - self, global_rank: int, world_size: int, is_slurm_managing_tasks: bool = True + self, global_rank: int, world_size: int ) -> None: self.ddp_plugin.init_ddp_connection( self.trainer, self.cluster_environment, global_rank, - world_size, - is_slurm_managing_tasks, + world_size ) def sync_tensor(self, diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index 8412bee2ddff9..5f2399f8c52c6 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +import sys import torch @@ -123,13 +124,14 @@ def on_trainer_init( # link up SLURM # TODO: this should be taken out of here... but depends too much on DDP - self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes) - self.trainer.node_rank = self.determine_ddp_node_rank() - self.trainer.local_rank = self.determine_local_rank() - self.trainer.global_rank = 0 + #self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes) + ##self.trainer.is_slurm_managing_tasks = True + #self.trainer.node_rank = self.determine_ddp_node_rank() + #self.trainer.local_rank = self.determine_local_rank() + #self.trainer.global_rank = 0 # NVIDIA setup - self.set_nvidia_flags(self.trainer.is_slurm_managing_tasks, self.trainer.data_parallel_device_ids) + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) self.trainer.on_colab_kaggle = os.getenv('COLAB_GPU') or os.getenv('KAGGLE_URL_BASE') @@ -151,12 +153,13 @@ def _map_deprecated_dist_backend(self, accelerator, distributed_backend): return distributed_backend def _select_environment(self): - if self.trainer.plugin_connector.cloud_environment: - env = self.trainer.plugin_connector.cloud_environment - elif self.trainer.is_slurm_managing_tasks: - env = SLURMEnvironment() - elif self._is_using_torchelastic(): - env = TorchElasticEnvironment() + if self.trainer.plugin_connector.cluster_environment: + env = self.trainer.plugin_connector.cluster_environment + print("RETURNING self.trainer.plugin_connector.cluster_environment", env) + #elif self.trainer.is_slurm_managing_tasks: + # env = SLURMEnvironment() + #elif self._is_using_torchelastic(): + # env = TorchElasticEnvironment() else: env = TorchElasticEnvironment() return env @@ -182,91 +185,33 @@ def select_accelerator(self): # ---------------------------------- # choose an accelerator for the user # ---------------------------------- - use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks + #use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks # torchelastic or general non_slurm ddp - te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ) - use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed + #te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ) + #use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed + + #use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn" + #use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu" - use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn" - use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu" + #use_ddp_hpc = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_hpc" - use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic() - use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks + #use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic() + #use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks # ddp script mode uses the same flags as TE # TODO: decouple from TE - if os.environ.get('PL_IN_DDP_SUBPROCESS', False): - use_torchelastic_ddp = False + #if os.environ.get('PL_IN_DDP_SUBPROCESS', False): + # use_torchelastic_ddp = False cluster_env = self._select_environment() - # choose the appropriate accelerator backend if self.trainer.use_ddp2: accelerator_backend = accelerators.DDP2Accelerator( self.trainer, cluster_env, self.trainer.plugin_connector.ddp_plugin ) - - elif use_ddp_cpu_slurm: - accelerator_backend = accelerators.DDPCPUHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_slurm_ddp: - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_cpu_torch_elastic: - accelerator_backend = accelerators.DDPCPUHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_torchelastic_ddp: - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_spawn: - accelerator_backend = accelerators.DDPSpawnAccelerator( - self.trainer, - nprocs=self.trainer.num_processes, - cluster_environment=cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_cpu_spawn: - accelerator_backend = accelerators.DDPCPUSpawnAccelerator( - self.trainer, - nprocs=self.trainer.num_processes, - cluster_environment=cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif self.trainer.distributed_backend == "ddp": - accelerator_backend = accelerators.DDPAccelerator( - self.trainer, - cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif self.trainer.distributed_backend == "ddp_hpc": - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - elif self.trainer.use_dp: accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) @@ -281,11 +226,124 @@ def select_accelerator(self): elif self.trainer.distributed_backend is None: accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) + + elif self.trainer.use_ddp: #self.trainer.distributed_backend in ('ddp', 'ddp_spawn'): + spawn = self.trainer.distributed_backend == "ddp_spawn" + use_cpu = not self.trainer.gpus is None + + acc_args = [self.trainer] + acc_kwargs = {'cluster_environment': cluster_env, 'ddp_plugin': self.trainer.plugin_connector.ddp_plugin} + ddp_cls = None + if use_cpu: + if spawn: + ddp_cls = accelerators.DDPCPUSpawnAccelerator + ddp_kwargs['nprocs'] = self.trainer.num_processes + else: + ddp_cls = accelerators.DDPCPUHPCAccelerator + else: + if spawn: + ddp_cls = accelerators.DDPSpawnAccelerator + ddp_kwargs['nprocs'] = self.trainer.num_processes + else: + ddp_cls = accelerators.DDPHPCAccelerator + accelerator_backend = ddp_cls(*acc_args, **acc_kwargs) else: raise MisconfigurationException( f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend' ) + ## choose the appropriate accelerator backend + #if self.trainer.use_ddp2: # Done + # accelerator_backend = accelerators.DDP2Accelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_ddp_cpu_slurm: # Done + # accelerator_backend = accelerators.DDPCPUHPCAccelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_slurm_ddp: # Done + # accelerator_backend = accelerators.DDPHPCAccelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_ddp_cpu_torch_elastic: # Done + # accelerator_backend = accelerators.DDPCPUHPCAccelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_torchelastic_ddp: # Done + # accelerator_backend = accelerators.DDPHPCAccelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_ddp_spawn: # Done + # accelerator_backend = accelerators.DDPSpawnAccelerator( + # self.trainer, + # nprocs=self.trainer.num_processes, + # cluster_environment=cluster_env, + # ddp_plugin=self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_ddp_cpu_spawn: # Done + # accelerator_backend = accelerators.DDPCPUSpawnAccelerator( + # self.trainer, + # nprocs=self.trainer.num_processes, + # cluster_environment=cluster_env, + # ddp_plugin=self.trainer.plugin_connector.ddp_plugin + # ) + + #elif self.trainer.distributed_backend == "ddp": + # accelerator_backend = accelerators.DDPAccelerator( + # self.trainer, + # cluster_env, + # ddp_plugin=self.trainer.plugin_connector.ddp_plugin + # ) + + #elif self.trainer.distributed_backend == "ddp_hpc": # Done + # print("USING accelerators.DDPHPCAccelerator", file=sys.stderr) + # accelerator_backend = accelerators.DDPHPCAccelerator( + # self.trainer, + # cluster_env, + # ddp_plugin=self.trainer.plugin_connector.ddp_plugin + # ) + + #elif self.trainer.use_dp: + # accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) + + #elif self.trainer.use_horovod: + # accelerator_backend = accelerators.HorovodAccelerator(self.trainer, cluster_env) + + #elif self.trainer.use_single_gpu: + # accelerator_backend = accelerators.GPUAccelerator(self.trainer, cluster_env) + + #elif self.trainer.use_tpu: + # accelerator_backend = accelerators.TPUAccelerator(self.trainer, cluster_env) + + #elif self.trainer.distributed_backend is None: + # accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) + #else: + # raise MisconfigurationException( + # f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend' + # ) + + rank = os.environ['JSM_NAMESPACE_RANK'] + size = os.environ['JSM_NAMESPACE_SIZE'] + + rank_id = '%s/%s' % (rank, size) + print('select_accelerator %s' % rank_id, "using acclerator type %s, cluster environment type %s" % (type(accelerator_backend), type(cluster_env)), file=sys.stderr) + return accelerator_backend def set_distributed_mode(self): @@ -389,7 +447,7 @@ def has_horovodrun(): """Returns True if running with `horovodrun` using Gloo or OpenMPI.""" return 'OMPI_COMM_WORLD_RANK' in os.environ or 'HOROVOD_RANK' in os.environ - def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): + def set_nvidia_flags(self, data_parallel_device_ids): if data_parallel_device_ids is None: return @@ -397,14 +455,19 @@ def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) - log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') + #log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') def determine_local_rank(self): + rank_id = "RANK_ID-%s/%s" % (os.environ['JSM_NAMESPACE_RANK'], os.environ['JSM_NAMESPACE_SIZE']) + print("determine_local_rank", rank_id, "is_slurm_managing_tasks =", self.trainer.is_slurm_managing_tasks) if self.trainer.is_slurm_managing_tasks: return int(os.environ['SLURM_LOCALID']) return int(os.environ.get('LOCAL_RANK', 0)) def determine_ddp_node_rank(self): + rank_id = "RANK_ID-%s/%s" % (os.environ['JSM_NAMESPACE_RANK'], os.environ['JSM_NAMESPACE_SIZE']) + print("determine_ddp_node_rank", rank_id, "is_slurm_managing_tasks =", self.trainer.is_slurm_managing_tasks) + if self.trainer.is_slurm_managing_tasks: return int(os.environ['SLURM_NODEID']) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 2e3e39a5cd4d4..d1e5e38c1eaeb 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -145,8 +145,7 @@ def ddp_train(self, process_idx, mp_queue, model): model.trainer = self.trainer self.init_ddp_connection( self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks + self.trainer.world_size ) if isinstance(self.ddp_plugin, RPCPlugin): diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index 9789247ac24ce..02f7eb2b0eb9e 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -242,8 +242,7 @@ def ddp_train(self, process_idx, model): model.trainer = self.trainer self.init_ddp_connection( self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks + self.trainer.world_size ) if isinstance(self.ddp_plugin, RPCPlugin): diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index ed94a8fdae220..8e4807ff1088f 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -34,7 +34,7 @@ def __init__(self, trainer, cluster_environment=None, ddp_plugin=None): super().__init__(trainer, cluster_environment, ddp_plugin) self.nickname = 'ddp_cpu' - def model_to_device(self, model, process_idx): + def model_to_device(self, model): model.cpu() def get_device_ids(self): diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index f109f555f575e..295a034ca5471 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -207,7 +207,7 @@ def set_world_ranks(self, process_idx): self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - def model_to_device(self, model, process_idx): + def model_to_device(self, model): model.cpu() def get_device_ids(self): diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 58bc5a375d400..bdc66fb50fa89 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -32,6 +32,7 @@ from hydra.utils import get_original_cwd, to_absolute_path +import sys class DDPHPCAccelerator(Accelerator): def __init__(self, trainer, cluster_environment=None, ddp_plugin=None): @@ -59,9 +60,16 @@ def train(self): self.ddp_train(process_idx=self.task_idx, model=model) def set_world_ranks(self, process_idx): + rank = os.environ['JSM_NAMESPACE_RANK'] + size = os.environ['JSM_NAMESPACE_SIZE'] + + rank_id = '%s/%s' % (rank, size) + print("set_world_ranks", rank_id, "process_idx=%s, trainer.node_rank=%s, trainer.num_processes=%s, trainer.num_nodes=%s" % (process_idx, self.trainer.node_rank, self.trainer.num_processes, self.trainer.num_nodes), file=sys.stderr) self.trainer.local_rank = process_idx - self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx + #self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx + self.trainer.global_rank = self.cluster_environment.node_rank() * self.trainer.num_processes + process_idx self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes + print("set_world_ranks", rank_id, "trainer.global_rank=%s, trainer.world_size=%s" % (self.trainer.global_rank, self.trainer.world_size), file=sys.stderr) def init_device(self, process_idx): self.trainer.root_gpu = process_idx @@ -138,8 +146,7 @@ def ddp_train(self, process_idx, model): model.trainer = self.trainer self.init_ddp_connection( self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks + self.trainer.world_size ) if isinstance(self.ddp_plugin, RPCPlugin): diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/cluster_environments/lsf_environment.py index e83177f213cb8..e95547fa72813 100644 --- a/pytorch_lightning/cluster_environments/lsf_environment.py +++ b/pytorch_lightning/cluster_environments/lsf_environment.py @@ -14,6 +14,7 @@ import os import re +import socket import warnings from pytorch_lightning import _logger as log from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment @@ -44,7 +45,9 @@ def __init__(self): self._master_address = self._get_master_address() self._master_port = self._get_master_port() self._local_rank = self._get_local_rank() + self._global_rank = self._get_global_rank() self._world_size = self._get_world_size() + self._node_rank = self._get_node_rank() # set environment variables needed for initializing torch distributed process group os.environ["MASTER_ADDR"] = str(self._master_address) @@ -52,8 +55,7 @@ def __init__(self): os.environ["MASTER_PORT"] = str(self._master_port) log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") - def _get_master_address(self): - """A helper for getting the master address""" + def _read_hosts(self): var = "LSB_HOSTS" hosts = os.environ.get(var) if not hosts: @@ -62,6 +64,11 @@ def _get_master_address(self): if len(hosts) < 2: raise ValueError("Cannot parse hosts from LSB_HOSTS environment variable -- " "expected format \"batch ...\"") + return hosts + + def _get_master_address(self): + """A helper for getting the master address""" + hosts = self._read_hosts() return hosts[1] def _get_master_port(self): @@ -85,6 +92,18 @@ def _get_master_port(self): log.debug("using externally specified master port") return port + def _get_global_rank(self): + """A helper function for getting the global rank + + Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + var = "JSM_NAMESPACE_RANK" + global_rank = os.environ.get(var) + if global_rank is None: + raise ValueError("Cannot determine global rank -- expected in %s " + "-- make sure you run your executable with jsrun" % var) + return int(global_rank) + def _get_local_rank(self): """A helper function for getting the local rank @@ -109,6 +128,16 @@ def _get_world_size(self): "-- make sure you run your executable with jsrun" % var) return int(world_size) + def _get_node_rank(self): + hosts = self._read_hosts() + count = dict() + for host in hosts: + if 'batch' in host or 'login' in host: + continue + if host not in count: + count[host] = len(count) + return count[socket.gethostname()] + def master_address(self): """ Master address is read from a list of hosts contained in the environment variable *LSB_HOSTS* @@ -132,3 +161,16 @@ def local_rank(self): World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK """ return self._local_rank + + def node_rank(self): + """ + Node rank is determined by the position of the current hostname in the list of hosts stored in LSB_HOSTS + """ + return self._node_rank + + def global_rank(self): + """ + World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + return self._global_rank + diff --git a/pytorch_lightning/cluster_environments/slurm_environment.py b/pytorch_lightning/cluster_environments/slurm_environment.py index aa78709e12b4e..daec768717185 100644 --- a/pytorch_lightning/cluster_environments/slurm_environment.py +++ b/pytorch_lightning/cluster_environments/slurm_environment.py @@ -78,3 +78,6 @@ def _resolve_root_node_address(self, root_node): root_node = name + number return root_node + + def node_rank(self): + return int(os.environ['SLURM_NODEID']) diff --git a/pytorch_lightning/cluster_environments/torchelastic_environment.py b/pytorch_lightning/cluster_environments/torchelastic_environment.py index a4d769518d252..868653f1ad304 100644 --- a/pytorch_lightning/cluster_environments/torchelastic_environment.py +++ b/pytorch_lightning/cluster_environments/torchelastic_environment.py @@ -45,7 +45,10 @@ def master_port(self): return port def world_size(self): - return os.environ.get('WORLD_SIZE') + return int(os.environ.get('WORLD_SIZE')) def local_rank(self): - return int(os.environ['LOCAL_RANK']) + return int(os.environ.get('LOCAL_RANK')) + + def node_rank(self): + return int(os.environ.get('GROUP_RANK')) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 281074cb37813..22f637f6aa7d9 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -1,3 +1,4 @@ +import sys import os from contextlib import contextmanager from typing import Any, Dict, List, Optional, Union @@ -63,6 +64,11 @@ def configure_ddp(self, model, device_ids): self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get( "find_unused_parameters", True ) + rank = os.environ['JSM_NAMESPACE_RANK'] + size = os.environ['JSM_NAMESPACE_SIZE'] + + rank_id = '%s/%s' % (rank, size) + print("LightningDistributedDataParallel", rank_id, device_ids, self._ddp_kwargs, file=sys.stderr) model = LightningDistributedDataParallel( model, device_ids=device_ids, @@ -75,21 +81,40 @@ def init_ddp_connection( trainer, cluster_environment, global_rank: int, - world_size: int, - is_slurm_managing_tasks: bool = True, + world_size: int ) -> None: os.environ["MASTER_ADDR"] = str(cluster_environment.master_address()) os.environ["MASTER_PORT"] = str(cluster_environment.master_port()) os.environ["WORLD_SIZE"] = str(cluster_environment.world_size()) + torch_backend = "nccl" if trainer.on_gpu else "gloo" + rank = os.environ['JSM_NAMESPACE_RANK'] + size = os.environ['JSM_NAMESPACE_SIZE'] + + msg = dict( + NCCL_SOCKET_IFNAME = os.environ.get('NCCL_SOCKET_IFNAME', "NO_IFNAME"), + MASTER_ADDR = os.environ["MASTER_ADDR"], + MASTER_PORT = os.environ["MASTER_PORT"], + WORLD_SIZE = os.environ["WORLD_SIZE"] + ) + ipg = dict(backend=torch_backend, rank=global_rank, world_size=world_size) + + rank_id = '%s/%s' % (rank, size) + + os.environ['NCCL_SOCKET_IFNAME'] = 'ib0' + if not torch_distrib.is_initialized(): log.info( f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}" ) + print("init_ddp_connection INITIALIZING %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) torch_distrib.init_process_group( torch_backend, rank=global_rank, world_size=world_size ) + print("init_ddp_connection FINISHED INITIALIZING %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) + else: + print("init_ddp_connection ALREADY INITIALIZED %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) def on_before_forward(self, model: LightningModule, *args): """ diff --git a/pytorch_lightning/plugins/ddp_sequential_plugin.py b/pytorch_lightning/plugins/ddp_sequential_plugin.py index 010f0ea1648a8..c8c0d020e940f 100644 --- a/pytorch_lightning/plugins/ddp_sequential_plugin.py +++ b/pytorch_lightning/plugins/ddp_sequential_plugin.py @@ -109,7 +109,6 @@ def init_ddp_connection( cluster_environment, global_rank: int, world_size: int, - is_slurm_managing_tasks: bool = True, ) -> None: trainer.prepared_for_backwards = False self._check_arguments(trainer) @@ -119,8 +118,7 @@ def init_ddp_connection( trainer=trainer, cluster_environment=cluster_environment, global_rank=global_rank, - world_size=world_size, - is_slurm_managing_tasks=is_slurm_managing_tasks + world_size=world_size ) super().init_rpc_connection( global_rank=global_rank, diff --git a/pytorch_lightning/plugins/plugin_connector.py b/pytorch_lightning/plugins/plugin_connector.py index d66c25173cc77..ba3ed222eaadf 100644 --- a/pytorch_lightning/plugins/plugin_connector.py +++ b/pytorch_lightning/plugins/plugin_connector.py @@ -30,7 +30,7 @@ def __init__(self, trainer): self.trainer = trainer self.plugins = [] self.ddp_plugin = DDPPlugin() - self.cloud_environment = None + self.cluster_environment = None self.amp_plugin = NativeAMPPlugin(trainer) self.apex_plugin = ApexPlugin(trainer) @@ -99,7 +99,7 @@ def __attach_cluster(self, limit=1): raise MisconfigurationException(m) # set the cluster - self.cloud_environment = plugin + self.cluster_environment = plugin def _convert_str_custom_plugins(self, plugins: Union[str, list]): """ diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 31a64d00ccb60..2d7e18da1cf18 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -405,6 +405,7 @@ def __init__( # last thing are the plugins which override whatever the trainer used by default self.plugin_connector.on_trainer_init(plugins) + print('ClusterEnvironment', self.plugin_connector.cluster_environment) # Callback system self.on_init_end() @@ -434,6 +435,15 @@ def fit( # bookkeeping self._state = TrainerState.RUNNING + # ---------------------------- + # SET UP TRAINING + # ---------------------------- + self.accelerator_backend = self.accelerator_connector.select_accelerator() + self.accelerator_backend.setup(model) + self.local_rank = self.accelerator_backend.cluster_environment.local_rank() + self.node_rank = self.accelerator_backend.cluster_environment.node_rank() + self.global_rank = self.accelerator_backend.cluster_environment.global_rank() + # ---------------------------- # LINK DATA # ---------------------------- @@ -447,12 +457,6 @@ def fit( # we reuse fit in .test() but change its behavior using this flag self.testing = os.environ.get('PL_TESTING_MODE', self.testing) - # ---------------------------- - # SET UP TRAINING - # ---------------------------- - self.accelerator_backend = self.accelerator_connector.select_accelerator() - self.accelerator_backend.setup(model) - # ---------------------------- # INSPECT THESE FOR MAIN LOOPS # ---------------------------- From b53d153200053ebfeb33b7f2e9fa074c870cc3bc Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 17:38:28 -0500 Subject: [PATCH 10/12] add additional methods to ClusterEnvironments --- .../cluster_environments/cluster_environment.py | 6 ++++++ pytorch_lightning/cluster_environments/slurm_environment.py | 3 +++ .../cluster_environments/torchelastic_environment.py | 3 +++ 3 files changed, 12 insertions(+) diff --git a/pytorch_lightning/cluster_environments/cluster_environment.py b/pytorch_lightning/cluster_environments/cluster_environment.py index 5196e44411082..e82b1c310d2bd 100644 --- a/pytorch_lightning/cluster_environments/cluster_environment.py +++ b/pytorch_lightning/cluster_environments/cluster_environment.py @@ -31,3 +31,9 @@ def world_size(self): def local_rank(self): pass + + def global_rank(self): + pass + + def node_rank(self): + pass diff --git a/pytorch_lightning/cluster_environments/slurm_environment.py b/pytorch_lightning/cluster_environments/slurm_environment.py index daec768717185..729190965b46b 100644 --- a/pytorch_lightning/cluster_environments/slurm_environment.py +++ b/pytorch_lightning/cluster_environments/slurm_environment.py @@ -81,3 +81,6 @@ def _resolve_root_node_address(self, root_node): def node_rank(self): return int(os.environ['SLURM_NODEID']) + + def global_rank(self): + return int(os.environ['SLURM_PROCID']) diff --git a/pytorch_lightning/cluster_environments/torchelastic_environment.py b/pytorch_lightning/cluster_environments/torchelastic_environment.py index 868653f1ad304..2fcd0b32f7aa1 100644 --- a/pytorch_lightning/cluster_environments/torchelastic_environment.py +++ b/pytorch_lightning/cluster_environments/torchelastic_environment.py @@ -52,3 +52,6 @@ def local_rank(self): def node_rank(self): return int(os.environ.get('GROUP_RANK')) + + def global_rank(self): + return int(os.environ['RANK']) From 0b6edfe1da002ee943a7e52c9545f3a639e2b24a Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 18:04:21 -0500 Subject: [PATCH 11/12] add NVIDIA mixin for setting up CUDA envars --- .../accelerators/ddp_hpc_accelerator.py | 15 +++++----- .../accelerators/ddp_spawn_accelerator.py | 7 ++++- .../accelerators/gpu_accelerator.py | 7 ++++- .../accelerators/nvidia_mixin.py | 30 +++++++++++++++++++ pytorch_lightning/trainer/trainer.py | 3 +- 5 files changed, 50 insertions(+), 12 deletions(-) create mode 100644 pytorch_lightning/accelerators/nvidia_mixin.py diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index eba548d278c05..188fb5a68b2c6 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -21,6 +21,7 @@ from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp +from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.distributed.dist import LightningDistributed @@ -35,7 +36,7 @@ import sys -class DDPHPCAccelerator(Accelerator): +class DDPHPCAccelerator(Accelerator, NVIDIAMixin): def __init__(self, trainer, @@ -58,23 +59,21 @@ def __init__(self, def setup(self, model): self.trainer.model = model + # ---------------------------- + # NVIDIA FLAGS + # ---------------------------- + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) self.task_idx = self.cluster_environment.local_rank() + def train(self): model = self.trainer.model self.ddp_train(process_idx=self.task_idx, model=model) def set_world_ranks(self, process_idx): - rank = os.environ['JSM_NAMESPACE_RANK'] - size = os.environ['JSM_NAMESPACE_SIZE'] - - rank_id = '%s/%s' % (rank, size) - print("set_world_ranks", rank_id, "process_idx=%s, trainer.node_rank=%s, trainer.num_processes=%s, trainer.num_nodes=%s" % (process_idx, self.trainer.node_rank, self.trainer.num_processes, self.trainer.num_nodes), file=sys.stderr) self.trainer.local_rank = process_idx - #self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx self.trainer.global_rank = self.cluster_environment.node_rank() * self.trainer.num_processes + process_idx self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - print("set_world_ranks", rank_id, "trainer.global_rank=%s, trainer.world_size=%s" % (self.trainer.global_rank, self.trainer.world_size), file=sys.stderr) def init_device(self, process_idx): self.trainer.root_gpu = process_idx diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index a49e17fc0b31d..5c8e0d50d29df 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -22,6 +22,7 @@ from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp +from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.distributed import LightningDistributed @@ -44,7 +45,7 @@ from hydra.utils import get_original_cwd, to_absolute_path -class DDPSpawnAccelerator(Accelerator): +class DDPSpawnAccelerator(Accelerator, NVIDIAMixin): def __init__(self, trainer, @@ -68,6 +69,10 @@ def __init__(self, def setup(self, model): os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', str(find_free_network_port())) + # ---------------------------- + # NVIDIA FLAGS + # ---------------------------- + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) # pass in a state q smp = mp.get_context('spawn') diff --git a/pytorch_lightning/accelerators/gpu_accelerator.py b/pytorch_lightning/accelerators/gpu_accelerator.py index 1310777e0d890..e101b4367f976 100644 --- a/pytorch_lightning/accelerators/gpu_accelerator.py +++ b/pytorch_lightning/accelerators/gpu_accelerator.py @@ -17,12 +17,13 @@ from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp +from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.utilities import AMPType -class GPUAccelerator(Accelerator): +class GPUAccelerator(Accelerator, NVIDIAMixin): amp_backend: AMPType def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None): @@ -40,6 +41,10 @@ def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = self.nickname = None def setup(self, model): + # ---------------------------- + # NVIDIA FLAGS + # ---------------------------- + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) # call setup self.trainer.call_setup_hook(model) diff --git a/pytorch_lightning/accelerators/nvidia_mixin.py b/pytorch_lightning/accelerators/nvidia_mixin.py new file mode 100644 index 0000000000000..4530a182833a8 --- /dev/null +++ b/pytorch_lightning/accelerators/nvidia_mixin.py @@ -0,0 +1,30 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License +import os + +import torch + +from pytorch_lightning import _logger as log + +class NVIDIAMixin: + + def set_nvidia_flags(self, data_parallel_device_ids): + if data_parallel_device_ids is None: + return + + # set the correct cuda visible devices (using pci order) + os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) + devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) + log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 6247945efa232..39a5129a1077b 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -405,7 +405,6 @@ def __init__( # last thing are the plugins which override whatever the trainer used by default self.plugin_connector.on_trainer_init(plugins) - print('ClusterEnvironment', self.plugin_connector.cluster_environment) # Callback system self.on_init_end() @@ -439,10 +438,10 @@ def fit( # SET UP TRAINING # ---------------------------- self.accelerator_backend = self.accelerator_connector.select_accelerator() - self.accelerator_backend.setup(model) self.local_rank = self.accelerator_backend.cluster_environment.local_rank() self.node_rank = self.accelerator_backend.cluster_environment.node_rank() self.global_rank = self.accelerator_backend.cluster_environment.global_rank() + self.accelerator_backend.setup(model) # ---------------------------- # LINK DATA From f7d87f6da311cda1656217fd71f2941649cc85d4 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 19:49:21 -0500 Subject: [PATCH 12/12] remove troubleshooting prints --- .../accelerators/accelerator_connector.py | 161 +----------------- pytorch_lightning/plugins/ddp_plugin.py | 24 --- 2 files changed, 3 insertions(+), 182 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index 5f2399f8c52c6..01e82203bd0ae 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -122,17 +122,6 @@ def on_trainer_init( self.trainer.world_size = 1 self.trainer.interactive_ddp_procs = [] - # link up SLURM - # TODO: this should be taken out of here... but depends too much on DDP - #self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes) - ##self.trainer.is_slurm_managing_tasks = True - #self.trainer.node_rank = self.determine_ddp_node_rank() - #self.trainer.local_rank = self.determine_local_rank() - #self.trainer.global_rank = 0 - - # NVIDIA setup - self.set_nvidia_flags(self.trainer.data_parallel_device_ids) - self.trainer.on_colab_kaggle = os.getenv('COLAB_GPU') or os.getenv('KAGGLE_URL_BASE') self.trainer.replace_sampler_ddp = replace_sampler_ddp @@ -155,11 +144,6 @@ def _map_deprecated_dist_backend(self, accelerator, distributed_backend): def _select_environment(self): if self.trainer.plugin_connector.cluster_environment: env = self.trainer.plugin_connector.cluster_environment - print("RETURNING self.trainer.plugin_connector.cluster_environment", env) - #elif self.trainer.is_slurm_managing_tasks: - # env = SLURMEnvironment() - #elif self._is_using_torchelastic(): - # env = TorchElasticEnvironment() else: env = TorchElasticEnvironment() return env @@ -185,25 +169,6 @@ def select_accelerator(self): # ---------------------------------- # choose an accelerator for the user # ---------------------------------- - #use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks - - # torchelastic or general non_slurm ddp - #te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ) - #use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed - - #use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn" - #use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu" - - #use_ddp_hpc = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_hpc" - - #use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic() - #use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks - - # ddp script mode uses the same flags as TE - # TODO: decouple from TE - #if os.environ.get('PL_IN_DDP_SUBPROCESS', False): - # use_torchelastic_ddp = False - cluster_env = self._select_environment() if self.trainer.use_ddp2: @@ -227,9 +192,9 @@ def select_accelerator(self): elif self.trainer.distributed_backend is None: accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) - elif self.trainer.use_ddp: #self.trainer.distributed_backend in ('ddp', 'ddp_spawn'): + elif self.trainer.use_ddp: spawn = self.trainer.distributed_backend == "ddp_spawn" - use_cpu = not self.trainer.gpus is None + use_cpu = self.trainer.gpus is None acc_args = [self.trainer] acc_kwargs = {'cluster_environment': cluster_env, 'ddp_plugin': self.trainer.plugin_connector.ddp_plugin} @@ -252,98 +217,6 @@ def select_accelerator(self): f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend' ) - ## choose the appropriate accelerator backend - #if self.trainer.use_ddp2: # Done - # accelerator_backend = accelerators.DDP2Accelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_ddp_cpu_slurm: # Done - # accelerator_backend = accelerators.DDPCPUHPCAccelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_slurm_ddp: # Done - # accelerator_backend = accelerators.DDPHPCAccelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_ddp_cpu_torch_elastic: # Done - # accelerator_backend = accelerators.DDPCPUHPCAccelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_torchelastic_ddp: # Done - # accelerator_backend = accelerators.DDPHPCAccelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_ddp_spawn: # Done - # accelerator_backend = accelerators.DDPSpawnAccelerator( - # self.trainer, - # nprocs=self.trainer.num_processes, - # cluster_environment=cluster_env, - # ddp_plugin=self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_ddp_cpu_spawn: # Done - # accelerator_backend = accelerators.DDPCPUSpawnAccelerator( - # self.trainer, - # nprocs=self.trainer.num_processes, - # cluster_environment=cluster_env, - # ddp_plugin=self.trainer.plugin_connector.ddp_plugin - # ) - - #elif self.trainer.distributed_backend == "ddp": - # accelerator_backend = accelerators.DDPAccelerator( - # self.trainer, - # cluster_env, - # ddp_plugin=self.trainer.plugin_connector.ddp_plugin - # ) - - #elif self.trainer.distributed_backend == "ddp_hpc": # Done - # print("USING accelerators.DDPHPCAccelerator", file=sys.stderr) - # accelerator_backend = accelerators.DDPHPCAccelerator( - # self.trainer, - # cluster_env, - # ddp_plugin=self.trainer.plugin_connector.ddp_plugin - # ) - - #elif self.trainer.use_dp: - # accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) - - #elif self.trainer.use_horovod: - # accelerator_backend = accelerators.HorovodAccelerator(self.trainer, cluster_env) - - #elif self.trainer.use_single_gpu: - # accelerator_backend = accelerators.GPUAccelerator(self.trainer, cluster_env) - - #elif self.trainer.use_tpu: - # accelerator_backend = accelerators.TPUAccelerator(self.trainer, cluster_env) - - #elif self.trainer.distributed_backend is None: - # accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) - #else: - # raise MisconfigurationException( - # f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend' - # ) - - rank = os.environ['JSM_NAMESPACE_RANK'] - size = os.environ['JSM_NAMESPACE_SIZE'] - - rank_id = '%s/%s' % (rank, size) - print('select_accelerator %s' % rank_id, "using acclerator type %s, cluster environment type %s" % (type(accelerator_backend), type(cluster_env)), file=sys.stderr) - return accelerator_backend def set_distributed_mode(self): @@ -377,7 +250,7 @@ def set_distributed_mode(self): elif self.trainer.num_gpus > 1: self.trainer.use_dp = True - elif self.trainer.distributed_backend in ("ddp", "ddp_spawn", "ddp_hpc"): + elif self.trainer.distributed_backend in ("ddp", "ddp_spawn"): if self.trainer.num_gpus == 0: if self.trainer.num_nodes > 1 or self.trainer.num_processes > 1: self.trainer.use_ddp = True # ddp_cpu @@ -455,31 +328,3 @@ def set_nvidia_flags(self, data_parallel_device_ids): os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) - #log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') - - def determine_local_rank(self): - rank_id = "RANK_ID-%s/%s" % (os.environ['JSM_NAMESPACE_RANK'], os.environ['JSM_NAMESPACE_SIZE']) - print("determine_local_rank", rank_id, "is_slurm_managing_tasks =", self.trainer.is_slurm_managing_tasks) - if self.trainer.is_slurm_managing_tasks: - return int(os.environ['SLURM_LOCALID']) - return int(os.environ.get('LOCAL_RANK', 0)) - - def determine_ddp_node_rank(self): - rank_id = "RANK_ID-%s/%s" % (os.environ['JSM_NAMESPACE_RANK'], os.environ['JSM_NAMESPACE_SIZE']) - print("determine_ddp_node_rank", rank_id, "is_slurm_managing_tasks =", self.trainer.is_slurm_managing_tasks) - - if self.trainer.is_slurm_managing_tasks: - return int(os.environ['SLURM_NODEID']) - - # torchelastic uses the envvar GROUP_RANK, whereas other systems(?) use NODE_RANK. - # otherwise use given node rank or default to node rank 0 - env_vars = ['NODE_RANK', 'GROUP_RANK'] - node_ids = [(k, os.environ.get(k, None)) for k in env_vars] - node_ids = [(k, v) for k, v in node_ids if v is not None] - if len(node_ids) == 0: - return 0 - if len(node_ids) > 1: - log.warning(f"Multiple environment variables ({node_ids}) defined for node rank. Using the first one.") - k, rank = node_ids.pop() - rank_zero_info(f"Using environment variable {k} for node rank ({rank}).") - return int(rank) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 22f637f6aa7d9..7e40d65b912f1 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -64,11 +64,6 @@ def configure_ddp(self, model, device_ids): self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get( "find_unused_parameters", True ) - rank = os.environ['JSM_NAMESPACE_RANK'] - size = os.environ['JSM_NAMESPACE_SIZE'] - - rank_id = '%s/%s' % (rank, size) - print("LightningDistributedDataParallel", rank_id, device_ids, self._ddp_kwargs, file=sys.stderr) model = LightningDistributedDataParallel( model, device_ids=device_ids, @@ -89,32 +84,13 @@ def init_ddp_connection( torch_backend = "nccl" if trainer.on_gpu else "gloo" - rank = os.environ['JSM_NAMESPACE_RANK'] - size = os.environ['JSM_NAMESPACE_SIZE'] - - msg = dict( - NCCL_SOCKET_IFNAME = os.environ.get('NCCL_SOCKET_IFNAME', "NO_IFNAME"), - MASTER_ADDR = os.environ["MASTER_ADDR"], - MASTER_PORT = os.environ["MASTER_PORT"], - WORLD_SIZE = os.environ["WORLD_SIZE"] - ) - ipg = dict(backend=torch_backend, rank=global_rank, world_size=world_size) - - rank_id = '%s/%s' % (rank, size) - - os.environ['NCCL_SOCKET_IFNAME'] = 'ib0' - if not torch_distrib.is_initialized(): log.info( f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}" ) - print("init_ddp_connection INITIALIZING %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) torch_distrib.init_process_group( torch_backend, rank=global_rank, world_size=world_size ) - print("init_ddp_connection FINISHED INITIALIZING %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) - else: - print("init_ddp_connection ALREADY INITIALIZED %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) def on_before_forward(self, model: LightningModule, *args): """