From 9abe28e650440624e15cf0e0a68f008c1d21270b Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 3 Dec 2020 18:25:52 -0500 Subject: [PATCH 01/34] add ClusterEnvironment for LSF systems --- .../cluster_environments/lsf_environment.py | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 pytorch_lightning/cluster_environments/lsf_environment.py diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/cluster_environments/lsf_environment.py new file mode 100644 index 0000000000000..2b66e0b964cec --- /dev/null +++ b/pytorch_lightning/cluster_environments/lsf_environment.py @@ -0,0 +1,71 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +from pytorch_lightning import _logger as log +from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment + + +class LSFEnvironment(ClusterEnvironment): + + def __init__(self): + super().__init__() + + def master_address(self): + # figure out the root node addr + try: + root_node = sorted(set(x for x in open(os.environ['LSB_DJOB_HOSTFILE'], 'r') + if 'batch' not in x and 'login' not in x))[0][:-1] + except Exception: + root_node = "127.0.0.1" + + os.environ["MASTER_ADDR"] = root_node + log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") + return root_node + + def master_port(self): + # ----------------------- + # LSF JOB = PORT number + # ----------------------- + # this way every process knows what port to use + try: + # use the last 4 numbers in the job id as the id + default_port = os.environ["LSB_JOB_ID"] + default_port = default_port[-4:] + + # all ports should be in the 10k+ range + default_port = int(default_port) + 15000 + + except Exception: + default_port = 12910 + + # ----------------------- + # PORT NUMBER = MASTER_PORT + # ----------------------- + # in case the user passed it in + try: + default_port = os.environ["MASTER_PORT"] + except Exception: + os.environ["MASTER_PORT"] = str(default_port) + + log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") + + return default_port + + def world_size(self): + return self._world_size + + def local_rank(self): + return int(os.environ['JSM_NAMESPACE_LOCAL_RANK']) From f2e44c1c4b89a2e71a50adeeb454d0e108f08f77 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 3 Dec 2020 19:34:16 -0500 Subject: [PATCH 02/34] update init file --- pytorch_lightning/cluster_environments/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pytorch_lightning/cluster_environments/__init__.py b/pytorch_lightning/cluster_environments/__init__.py index 5f03ec07fe311..72f3f46e7715a 100644 --- a/pytorch_lightning/cluster_environments/__init__.py +++ b/pytorch_lightning/cluster_environments/__init__.py @@ -13,4 +13,5 @@ # limitations under the License. from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment from pytorch_lightning.cluster_environments.slurm_environment import SLURMEnvironment +from pytorch_lightning.cluster_environments.lsf_environment import LSFEnvironment from pytorch_lightning.cluster_environments.torchelastic_environment import TorchElasticEnvironment From 615f08dddf7e5f2be3b038a619630ca1e62ac025 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 3 Dec 2020 19:43:56 -0500 Subject: [PATCH 03/34] add available cluster environments --- docs/source/accelerators.rst | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/source/accelerators.rst b/docs/source/accelerators.rst index ee801f2dee28b..629a43d014e8e 100644 --- a/docs/source/accelerators.rst +++ b/docs/source/accelerators.rst @@ -180,3 +180,27 @@ TPU Accelerator .. autoclass:: pytorch_lightning.accelerators.tpu_accelerator.TPUAccelerator :noindex: + +------------ + +***************************** +Available ClusterEnvironments +***************************** + +SLURM Environment +================= + +.. autoclass:: pytorch_lightning.cluster_environments.slurm_environment.SLURMEnvironment + :noindex + +LSF Environment +=============== + +.. autoclass:: pytorch_lightning.cluster_environments.lsf_environment.LSFEnvironment + :noindex + +TorchElastic Environment +======================== + +.. autoclass:: pytorch_lightning.cluster_environments.torchelastic_environment.TorchElasticEnvironment + :noindex From 86f2fa18be694bb85bfbe8b5869427d50c880e4c Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 17:21:28 -0500 Subject: [PATCH 04/34] clean up LSFEnvironment --- .../cluster_environments/lsf_environment.py | 139 +++++++++++++----- 1 file changed, 101 insertions(+), 38 deletions(-) diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/cluster_environments/lsf_environment.py index 2b66e0b964cec..e83177f213cb8 100644 --- a/pytorch_lightning/cluster_environments/lsf_environment.py +++ b/pytorch_lightning/cluster_environments/lsf_environment.py @@ -14,58 +14,121 @@ import os import re +import warnings from pytorch_lightning import _logger as log from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment class LSFEnvironment(ClusterEnvironment): + """An environment for running on clusters managed by the LSF resource manager. + + It is expected that any execution using this ClusterEnvironment was executed + using the Job Step Manager i.e. jsrun. + + This plugin expects the following environment variables: + + LSB_JOBID + The LSF assigned job ID + + LSB_HOSTS + The hosts used in the job. This string is expected to have the format "batch ...." + + JSM_NAMESPACE_LOCAL_RANK + The node local rank for the task. This environment variable is set by jsrun + + JSM_NAMESPACE_SIZE + The world size for the task. This environment variable is set by jsrun + """ def __init__(self): - super().__init__() + self._master_address = self._get_master_address() + self._master_port = self._get_master_port() + self._local_rank = self._get_local_rank() + self._world_size = self._get_world_size() - def master_address(self): - # figure out the root node addr - try: - root_node = sorted(set(x for x in open(os.environ['LSB_DJOB_HOSTFILE'], 'r') - if 'batch' not in x and 'login' not in x))[0][:-1] - except Exception: - root_node = "127.0.0.1" - - os.environ["MASTER_ADDR"] = root_node + # set environment variables needed for initializing torch distributed process group + os.environ["MASTER_ADDR"] = str(self._master_address) log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") - return root_node + os.environ["MASTER_PORT"] = str(self._master_port) + log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") - def master_port(self): - # ----------------------- - # LSF JOB = PORT number - # ----------------------- - # this way every process knows what port to use - try: - # use the last 4 numbers in the job id as the id - default_port = os.environ["LSB_JOB_ID"] - default_port = default_port[-4:] - - # all ports should be in the 10k+ range - default_port = int(default_port) + 15000 - - except Exception: - default_port = 12910 - - # ----------------------- - # PORT NUMBER = MASTER_PORT - # ----------------------- - # in case the user passed it in - try: - default_port = os.environ["MASTER_PORT"] - except Exception: - os.environ["MASTER_PORT"] = str(default_port) + def _get_master_address(self): + """A helper for getting the master address""" + var = "LSB_HOSTS" + hosts = os.environ.get(var) + if not hosts: + raise ValueError("Could not find hosts -- expected in environment variable %s" % var) + hosts = hosts.split() + if len(hosts) < 2: + raise ValueError("Cannot parse hosts from LSB_HOSTS environment variable -- " + "expected format \"batch ...\"") + return hosts[1] - log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") + def _get_master_port(self): + """A helper for getting the master port + + Use the LSF job ID so all ranks can compute the master port + """ + # check for user-specified master port + port = os.environ.get("MASTER_PORT") + if not port: + var = "LSB_JOBID" + jobid = os.environ.get(var) + if not jobid: + raise ValueError("Could not find job id -- expected in environment variable %s" % var) + else: + port = int(jobid) + # all ports should be in the 10k+ range + port = int(port) % 1000 + 10000 + log.debug("calculated master port") + else: + log.debug("using externally specified master port") + return port + + def _get_local_rank(self): + """A helper function for getting the local rank + + Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + var = "JSM_NAMESPACE_LOCAL_RANK" + local_rank = os.environ.get(var) + if local_rank is None: + raise ValueError("Cannot determine local rank -- expected in %s " + "-- make sure you run your executable with jsrun" % var) + return int(local_rank) + + def _get_world_size(self): + """A helper function for getting the world size - return default_port + Read this from the environment variable JSM_NAMESPACE_SIZE + """ + var = "JSM_NAMESPACE_SIZE" + world_size = os.environ.get(var) + if world_size is None: + raise ValueError("Cannot determine local rank -- expected in %s " + "-- make sure you run your executable with jsrun" % var) + return int(world_size) + + def master_address(self): + """ + Master address is read from a list of hosts contained in the environment variable *LSB_HOSTS* + """ + return self._master_address + + def master_port(self): + """ + Master port is calculated from the LSF job ID + """ + return self._master_port def world_size(self): + """ + World size is read from the environment variable JSM_NAMESPACE_SIZE + """ return self._world_size def local_rank(self): - return int(os.environ['JSM_NAMESPACE_LOCAL_RANK']) + """ + World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + return self._local_rank From b72b42d28f32ae532a55b46831e369b7b4e0f76d Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 17:23:37 -0500 Subject: [PATCH 05/34] add ddp_hpc as a distributed backend --- pytorch_lightning/accelerators/accelerator_connector.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index a22a8fb3702ee..0ccbaee7ba5e9 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -263,6 +263,13 @@ def select_accelerator(self): ddp_plugin=self.trainer.plugin_connector.ddp_plugin ) + elif self.trainer.distributed_backend == "ddp_hpc": + accelerator_backend = accelerators.DDPHPCAccelerator( + self.trainer, + cluster_env, + ddp_plugin=self.trainer.plugin_connector.ddp_plugin + ) + elif self.trainer.use_dp: accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) @@ -315,7 +322,7 @@ def set_distributed_mode(self): elif self.trainer.num_gpus > 1: self.trainer.use_dp = True - elif self.trainer.distributed_backend in ("ddp", "ddp_spawn"): + elif self.trainer.distributed_backend in ("ddp", "ddp_spawn", "ddp_hpc"): if self.trainer.num_gpus == 0: if self.trainer.num_nodes > 1 or self.trainer.num_processes > 1: self.trainer.use_ddp = True # ddp_cpu From 6a9a4ca80bf5355e0a0370a8c39710b26465e99c Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 17:25:04 -0500 Subject: [PATCH 06/34] clean up SLURMEnvironment --- pytorch_lightning/cluster_environments/slurm_environment.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pytorch_lightning/cluster_environments/slurm_environment.py b/pytorch_lightning/cluster_environments/slurm_environment.py index 6df1cf680c57f..aa78709e12b4e 100644 --- a/pytorch_lightning/cluster_environments/slurm_environment.py +++ b/pytorch_lightning/cluster_environments/slurm_environment.py @@ -20,9 +20,6 @@ class SLURMEnvironment(ClusterEnvironment): - def __init__(self): - super().__init__() - def master_address(self): # figure out the root node addr try: @@ -46,7 +43,7 @@ def master_port(self): default_port = default_port[-4:] # all ports should be in the 10k+ range - default_port = int(default_port) + 15000 + default_port = int(default_port) + 10000 except Exception: default_port = 12910 From 94e4d4b885f968eda3a001b299b1ae2d60ccd1ca Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 17:41:28 -0500 Subject: [PATCH 07/34] remove extra blank line --- pytorch_lightning/accelerators/accelerator.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index b5301dd686762..01b4d1abe5205 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -254,6 +254,3 @@ def block_ddp_plugin_sync_behaviour(self): """ cm = self.ddp_plugin.block_backward_sync(self.trainer.model) if self.ddp_plugin else None yield cm - - - From 113e787b3b003978d78fa694fbeb9e97f20eb9f1 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Wed, 9 Dec 2020 21:19:12 -0500 Subject: [PATCH 08/34] init device for DDPHPCAccelerator We need to do this so we don't send the model to the same device from multiple ranks --- pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 5f09189e8b42c..58bc5a375d400 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -129,6 +129,9 @@ def ddp_train(self, process_idx, model): # set warning rank rank_zero_only.rank = self.trainer.global_rank + # Initialize cuda device + self.init_device(process_idx) + # set up server using proc 0's ip address # try to init for 20 times at max in case ports are taken # where to store ip_table From d12d65231c260d9263aa95a4ef4458e0694e17fb Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 14:25:27 -0500 Subject: [PATCH 09/34] committing current state --- pytorch_lightning/accelerators/accelerator.py | 5 +- .../accelerators/accelerator_connector.py | 227 +++++++++++------- .../accelerators/ddp2_accelerator.py | 3 +- .../accelerators/ddp_accelerator.py | 3 +- .../accelerators/ddp_cpu_hpc_accelerator.py | 2 +- .../accelerators/ddp_cpu_spawn_accelerator.py | 2 +- .../accelerators/ddp_hpc_accelerator.py | 13 +- .../cluster_environments/lsf_environment.py | 46 +++- .../cluster_environments/slurm_environment.py | 3 + .../torchelastic_environment.py | 7 +- pytorch_lightning/plugins/ddp_plugin.py | 29 ++- .../plugins/ddp_sequential_plugin.py | 4 +- pytorch_lightning/plugins/plugin_connector.py | 4 +- pytorch_lightning/trainer/trainer.py | 16 +- 14 files changed, 253 insertions(+), 111 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index 01b4d1abe5205..de6a27601374f 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -144,14 +144,13 @@ def setup_optimizers(self, model): self.trainer.optimizer_frequencies = optimizer_frequencies def init_ddp_connection( - self, global_rank: int, world_size: int, is_slurm_managing_tasks: bool = True + self, global_rank: int, world_size: int ) -> None: self.ddp_plugin.init_ddp_connection( self.trainer, self.cluster_environment, global_rank, - world_size, - is_slurm_managing_tasks, + world_size ) def sync_tensor(self, diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index 8412bee2ddff9..5f2399f8c52c6 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +import sys import torch @@ -123,13 +124,14 @@ def on_trainer_init( # link up SLURM # TODO: this should be taken out of here... but depends too much on DDP - self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes) - self.trainer.node_rank = self.determine_ddp_node_rank() - self.trainer.local_rank = self.determine_local_rank() - self.trainer.global_rank = 0 + #self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes) + ##self.trainer.is_slurm_managing_tasks = True + #self.trainer.node_rank = self.determine_ddp_node_rank() + #self.trainer.local_rank = self.determine_local_rank() + #self.trainer.global_rank = 0 # NVIDIA setup - self.set_nvidia_flags(self.trainer.is_slurm_managing_tasks, self.trainer.data_parallel_device_ids) + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) self.trainer.on_colab_kaggle = os.getenv('COLAB_GPU') or os.getenv('KAGGLE_URL_BASE') @@ -151,12 +153,13 @@ def _map_deprecated_dist_backend(self, accelerator, distributed_backend): return distributed_backend def _select_environment(self): - if self.trainer.plugin_connector.cloud_environment: - env = self.trainer.plugin_connector.cloud_environment - elif self.trainer.is_slurm_managing_tasks: - env = SLURMEnvironment() - elif self._is_using_torchelastic(): - env = TorchElasticEnvironment() + if self.trainer.plugin_connector.cluster_environment: + env = self.trainer.plugin_connector.cluster_environment + print("RETURNING self.trainer.plugin_connector.cluster_environment", env) + #elif self.trainer.is_slurm_managing_tasks: + # env = SLURMEnvironment() + #elif self._is_using_torchelastic(): + # env = TorchElasticEnvironment() else: env = TorchElasticEnvironment() return env @@ -182,91 +185,33 @@ def select_accelerator(self): # ---------------------------------- # choose an accelerator for the user # ---------------------------------- - use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks + #use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks # torchelastic or general non_slurm ddp - te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ) - use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed + #te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ) + #use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed + + #use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn" + #use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu" - use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn" - use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu" + #use_ddp_hpc = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_hpc" - use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic() - use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks + #use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic() + #use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks # ddp script mode uses the same flags as TE # TODO: decouple from TE - if os.environ.get('PL_IN_DDP_SUBPROCESS', False): - use_torchelastic_ddp = False + #if os.environ.get('PL_IN_DDP_SUBPROCESS', False): + # use_torchelastic_ddp = False cluster_env = self._select_environment() - # choose the appropriate accelerator backend if self.trainer.use_ddp2: accelerator_backend = accelerators.DDP2Accelerator( self.trainer, cluster_env, self.trainer.plugin_connector.ddp_plugin ) - - elif use_ddp_cpu_slurm: - accelerator_backend = accelerators.DDPCPUHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_slurm_ddp: - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_cpu_torch_elastic: - accelerator_backend = accelerators.DDPCPUHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_torchelastic_ddp: - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_spawn: - accelerator_backend = accelerators.DDPSpawnAccelerator( - self.trainer, - nprocs=self.trainer.num_processes, - cluster_environment=cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_cpu_spawn: - accelerator_backend = accelerators.DDPCPUSpawnAccelerator( - self.trainer, - nprocs=self.trainer.num_processes, - cluster_environment=cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif self.trainer.distributed_backend == "ddp": - accelerator_backend = accelerators.DDPAccelerator( - self.trainer, - cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif self.trainer.distributed_backend == "ddp_hpc": - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - elif self.trainer.use_dp: accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) @@ -281,11 +226,124 @@ def select_accelerator(self): elif self.trainer.distributed_backend is None: accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) + + elif self.trainer.use_ddp: #self.trainer.distributed_backend in ('ddp', 'ddp_spawn'): + spawn = self.trainer.distributed_backend == "ddp_spawn" + use_cpu = not self.trainer.gpus is None + + acc_args = [self.trainer] + acc_kwargs = {'cluster_environment': cluster_env, 'ddp_plugin': self.trainer.plugin_connector.ddp_plugin} + ddp_cls = None + if use_cpu: + if spawn: + ddp_cls = accelerators.DDPCPUSpawnAccelerator + ddp_kwargs['nprocs'] = self.trainer.num_processes + else: + ddp_cls = accelerators.DDPCPUHPCAccelerator + else: + if spawn: + ddp_cls = accelerators.DDPSpawnAccelerator + ddp_kwargs['nprocs'] = self.trainer.num_processes + else: + ddp_cls = accelerators.DDPHPCAccelerator + accelerator_backend = ddp_cls(*acc_args, **acc_kwargs) else: raise MisconfigurationException( f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend' ) + ## choose the appropriate accelerator backend + #if self.trainer.use_ddp2: # Done + # accelerator_backend = accelerators.DDP2Accelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_ddp_cpu_slurm: # Done + # accelerator_backend = accelerators.DDPCPUHPCAccelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_slurm_ddp: # Done + # accelerator_backend = accelerators.DDPHPCAccelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_ddp_cpu_torch_elastic: # Done + # accelerator_backend = accelerators.DDPCPUHPCAccelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_torchelastic_ddp: # Done + # accelerator_backend = accelerators.DDPHPCAccelerator( + # self.trainer, + # cluster_env, + # self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_ddp_spawn: # Done + # accelerator_backend = accelerators.DDPSpawnAccelerator( + # self.trainer, + # nprocs=self.trainer.num_processes, + # cluster_environment=cluster_env, + # ddp_plugin=self.trainer.plugin_connector.ddp_plugin + # ) + + #elif use_ddp_cpu_spawn: # Done + # accelerator_backend = accelerators.DDPCPUSpawnAccelerator( + # self.trainer, + # nprocs=self.trainer.num_processes, + # cluster_environment=cluster_env, + # ddp_plugin=self.trainer.plugin_connector.ddp_plugin + # ) + + #elif self.trainer.distributed_backend == "ddp": + # accelerator_backend = accelerators.DDPAccelerator( + # self.trainer, + # cluster_env, + # ddp_plugin=self.trainer.plugin_connector.ddp_plugin + # ) + + #elif self.trainer.distributed_backend == "ddp_hpc": # Done + # print("USING accelerators.DDPHPCAccelerator", file=sys.stderr) + # accelerator_backend = accelerators.DDPHPCAccelerator( + # self.trainer, + # cluster_env, + # ddp_plugin=self.trainer.plugin_connector.ddp_plugin + # ) + + #elif self.trainer.use_dp: + # accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) + + #elif self.trainer.use_horovod: + # accelerator_backend = accelerators.HorovodAccelerator(self.trainer, cluster_env) + + #elif self.trainer.use_single_gpu: + # accelerator_backend = accelerators.GPUAccelerator(self.trainer, cluster_env) + + #elif self.trainer.use_tpu: + # accelerator_backend = accelerators.TPUAccelerator(self.trainer, cluster_env) + + #elif self.trainer.distributed_backend is None: + # accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) + #else: + # raise MisconfigurationException( + # f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend' + # ) + + rank = os.environ['JSM_NAMESPACE_RANK'] + size = os.environ['JSM_NAMESPACE_SIZE'] + + rank_id = '%s/%s' % (rank, size) + print('select_accelerator %s' % rank_id, "using acclerator type %s, cluster environment type %s" % (type(accelerator_backend), type(cluster_env)), file=sys.stderr) + return accelerator_backend def set_distributed_mode(self): @@ -389,7 +447,7 @@ def has_horovodrun(): """Returns True if running with `horovodrun` using Gloo or OpenMPI.""" return 'OMPI_COMM_WORLD_RANK' in os.environ or 'HOROVOD_RANK' in os.environ - def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): + def set_nvidia_flags(self, data_parallel_device_ids): if data_parallel_device_ids is None: return @@ -397,14 +455,19 @@ def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) - log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') + #log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') def determine_local_rank(self): + rank_id = "RANK_ID-%s/%s" % (os.environ['JSM_NAMESPACE_RANK'], os.environ['JSM_NAMESPACE_SIZE']) + print("determine_local_rank", rank_id, "is_slurm_managing_tasks =", self.trainer.is_slurm_managing_tasks) if self.trainer.is_slurm_managing_tasks: return int(os.environ['SLURM_LOCALID']) return int(os.environ.get('LOCAL_RANK', 0)) def determine_ddp_node_rank(self): + rank_id = "RANK_ID-%s/%s" % (os.environ['JSM_NAMESPACE_RANK'], os.environ['JSM_NAMESPACE_SIZE']) + print("determine_ddp_node_rank", rank_id, "is_slurm_managing_tasks =", self.trainer.is_slurm_managing_tasks) + if self.trainer.is_slurm_managing_tasks: return int(os.environ['SLURM_NODEID']) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 2e3e39a5cd4d4..d1e5e38c1eaeb 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -145,8 +145,7 @@ def ddp_train(self, process_idx, mp_queue, model): model.trainer = self.trainer self.init_ddp_connection( self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks + self.trainer.world_size ) if isinstance(self.ddp_plugin, RPCPlugin): diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index 9789247ac24ce..02f7eb2b0eb9e 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -242,8 +242,7 @@ def ddp_train(self, process_idx, model): model.trainer = self.trainer self.init_ddp_connection( self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks + self.trainer.world_size ) if isinstance(self.ddp_plugin, RPCPlugin): diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index ed94a8fdae220..8e4807ff1088f 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -34,7 +34,7 @@ def __init__(self, trainer, cluster_environment=None, ddp_plugin=None): super().__init__(trainer, cluster_environment, ddp_plugin) self.nickname = 'ddp_cpu' - def model_to_device(self, model, process_idx): + def model_to_device(self, model): model.cpu() def get_device_ids(self): diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index f109f555f575e..295a034ca5471 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -207,7 +207,7 @@ def set_world_ranks(self, process_idx): self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - def model_to_device(self, model, process_idx): + def model_to_device(self, model): model.cpu() def get_device_ids(self): diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 58bc5a375d400..bdc66fb50fa89 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -32,6 +32,7 @@ from hydra.utils import get_original_cwd, to_absolute_path +import sys class DDPHPCAccelerator(Accelerator): def __init__(self, trainer, cluster_environment=None, ddp_plugin=None): @@ -59,9 +60,16 @@ def train(self): self.ddp_train(process_idx=self.task_idx, model=model) def set_world_ranks(self, process_idx): + rank = os.environ['JSM_NAMESPACE_RANK'] + size = os.environ['JSM_NAMESPACE_SIZE'] + + rank_id = '%s/%s' % (rank, size) + print("set_world_ranks", rank_id, "process_idx=%s, trainer.node_rank=%s, trainer.num_processes=%s, trainer.num_nodes=%s" % (process_idx, self.trainer.node_rank, self.trainer.num_processes, self.trainer.num_nodes), file=sys.stderr) self.trainer.local_rank = process_idx - self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx + #self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx + self.trainer.global_rank = self.cluster_environment.node_rank() * self.trainer.num_processes + process_idx self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes + print("set_world_ranks", rank_id, "trainer.global_rank=%s, trainer.world_size=%s" % (self.trainer.global_rank, self.trainer.world_size), file=sys.stderr) def init_device(self, process_idx): self.trainer.root_gpu = process_idx @@ -138,8 +146,7 @@ def ddp_train(self, process_idx, model): model.trainer = self.trainer self.init_ddp_connection( self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks + self.trainer.world_size ) if isinstance(self.ddp_plugin, RPCPlugin): diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/cluster_environments/lsf_environment.py index e83177f213cb8..e95547fa72813 100644 --- a/pytorch_lightning/cluster_environments/lsf_environment.py +++ b/pytorch_lightning/cluster_environments/lsf_environment.py @@ -14,6 +14,7 @@ import os import re +import socket import warnings from pytorch_lightning import _logger as log from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment @@ -44,7 +45,9 @@ def __init__(self): self._master_address = self._get_master_address() self._master_port = self._get_master_port() self._local_rank = self._get_local_rank() + self._global_rank = self._get_global_rank() self._world_size = self._get_world_size() + self._node_rank = self._get_node_rank() # set environment variables needed for initializing torch distributed process group os.environ["MASTER_ADDR"] = str(self._master_address) @@ -52,8 +55,7 @@ def __init__(self): os.environ["MASTER_PORT"] = str(self._master_port) log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") - def _get_master_address(self): - """A helper for getting the master address""" + def _read_hosts(self): var = "LSB_HOSTS" hosts = os.environ.get(var) if not hosts: @@ -62,6 +64,11 @@ def _get_master_address(self): if len(hosts) < 2: raise ValueError("Cannot parse hosts from LSB_HOSTS environment variable -- " "expected format \"batch ...\"") + return hosts + + def _get_master_address(self): + """A helper for getting the master address""" + hosts = self._read_hosts() return hosts[1] def _get_master_port(self): @@ -85,6 +92,18 @@ def _get_master_port(self): log.debug("using externally specified master port") return port + def _get_global_rank(self): + """A helper function for getting the global rank + + Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + var = "JSM_NAMESPACE_RANK" + global_rank = os.environ.get(var) + if global_rank is None: + raise ValueError("Cannot determine global rank -- expected in %s " + "-- make sure you run your executable with jsrun" % var) + return int(global_rank) + def _get_local_rank(self): """A helper function for getting the local rank @@ -109,6 +128,16 @@ def _get_world_size(self): "-- make sure you run your executable with jsrun" % var) return int(world_size) + def _get_node_rank(self): + hosts = self._read_hosts() + count = dict() + for host in hosts: + if 'batch' in host or 'login' in host: + continue + if host not in count: + count[host] = len(count) + return count[socket.gethostname()] + def master_address(self): """ Master address is read from a list of hosts contained in the environment variable *LSB_HOSTS* @@ -132,3 +161,16 @@ def local_rank(self): World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK """ return self._local_rank + + def node_rank(self): + """ + Node rank is determined by the position of the current hostname in the list of hosts stored in LSB_HOSTS + """ + return self._node_rank + + def global_rank(self): + """ + World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + return self._global_rank + diff --git a/pytorch_lightning/cluster_environments/slurm_environment.py b/pytorch_lightning/cluster_environments/slurm_environment.py index aa78709e12b4e..daec768717185 100644 --- a/pytorch_lightning/cluster_environments/slurm_environment.py +++ b/pytorch_lightning/cluster_environments/slurm_environment.py @@ -78,3 +78,6 @@ def _resolve_root_node_address(self, root_node): root_node = name + number return root_node + + def node_rank(self): + return int(os.environ['SLURM_NODEID']) diff --git a/pytorch_lightning/cluster_environments/torchelastic_environment.py b/pytorch_lightning/cluster_environments/torchelastic_environment.py index a4d769518d252..868653f1ad304 100644 --- a/pytorch_lightning/cluster_environments/torchelastic_environment.py +++ b/pytorch_lightning/cluster_environments/torchelastic_environment.py @@ -45,7 +45,10 @@ def master_port(self): return port def world_size(self): - return os.environ.get('WORLD_SIZE') + return int(os.environ.get('WORLD_SIZE')) def local_rank(self): - return int(os.environ['LOCAL_RANK']) + return int(os.environ.get('LOCAL_RANK')) + + def node_rank(self): + return int(os.environ.get('GROUP_RANK')) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 281074cb37813..22f637f6aa7d9 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -1,3 +1,4 @@ +import sys import os from contextlib import contextmanager from typing import Any, Dict, List, Optional, Union @@ -63,6 +64,11 @@ def configure_ddp(self, model, device_ids): self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get( "find_unused_parameters", True ) + rank = os.environ['JSM_NAMESPACE_RANK'] + size = os.environ['JSM_NAMESPACE_SIZE'] + + rank_id = '%s/%s' % (rank, size) + print("LightningDistributedDataParallel", rank_id, device_ids, self._ddp_kwargs, file=sys.stderr) model = LightningDistributedDataParallel( model, device_ids=device_ids, @@ -75,21 +81,40 @@ def init_ddp_connection( trainer, cluster_environment, global_rank: int, - world_size: int, - is_slurm_managing_tasks: bool = True, + world_size: int ) -> None: os.environ["MASTER_ADDR"] = str(cluster_environment.master_address()) os.environ["MASTER_PORT"] = str(cluster_environment.master_port()) os.environ["WORLD_SIZE"] = str(cluster_environment.world_size()) + torch_backend = "nccl" if trainer.on_gpu else "gloo" + rank = os.environ['JSM_NAMESPACE_RANK'] + size = os.environ['JSM_NAMESPACE_SIZE'] + + msg = dict( + NCCL_SOCKET_IFNAME = os.environ.get('NCCL_SOCKET_IFNAME', "NO_IFNAME"), + MASTER_ADDR = os.environ["MASTER_ADDR"], + MASTER_PORT = os.environ["MASTER_PORT"], + WORLD_SIZE = os.environ["WORLD_SIZE"] + ) + ipg = dict(backend=torch_backend, rank=global_rank, world_size=world_size) + + rank_id = '%s/%s' % (rank, size) + + os.environ['NCCL_SOCKET_IFNAME'] = 'ib0' + if not torch_distrib.is_initialized(): log.info( f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}" ) + print("init_ddp_connection INITIALIZING %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) torch_distrib.init_process_group( torch_backend, rank=global_rank, world_size=world_size ) + print("init_ddp_connection FINISHED INITIALIZING %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) + else: + print("init_ddp_connection ALREADY INITIALIZED %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) def on_before_forward(self, model: LightningModule, *args): """ diff --git a/pytorch_lightning/plugins/ddp_sequential_plugin.py b/pytorch_lightning/plugins/ddp_sequential_plugin.py index 010f0ea1648a8..c8c0d020e940f 100644 --- a/pytorch_lightning/plugins/ddp_sequential_plugin.py +++ b/pytorch_lightning/plugins/ddp_sequential_plugin.py @@ -109,7 +109,6 @@ def init_ddp_connection( cluster_environment, global_rank: int, world_size: int, - is_slurm_managing_tasks: bool = True, ) -> None: trainer.prepared_for_backwards = False self._check_arguments(trainer) @@ -119,8 +118,7 @@ def init_ddp_connection( trainer=trainer, cluster_environment=cluster_environment, global_rank=global_rank, - world_size=world_size, - is_slurm_managing_tasks=is_slurm_managing_tasks + world_size=world_size ) super().init_rpc_connection( global_rank=global_rank, diff --git a/pytorch_lightning/plugins/plugin_connector.py b/pytorch_lightning/plugins/plugin_connector.py index d66c25173cc77..ba3ed222eaadf 100644 --- a/pytorch_lightning/plugins/plugin_connector.py +++ b/pytorch_lightning/plugins/plugin_connector.py @@ -30,7 +30,7 @@ def __init__(self, trainer): self.trainer = trainer self.plugins = [] self.ddp_plugin = DDPPlugin() - self.cloud_environment = None + self.cluster_environment = None self.amp_plugin = NativeAMPPlugin(trainer) self.apex_plugin = ApexPlugin(trainer) @@ -99,7 +99,7 @@ def __attach_cluster(self, limit=1): raise MisconfigurationException(m) # set the cluster - self.cloud_environment = plugin + self.cluster_environment = plugin def _convert_str_custom_plugins(self, plugins: Union[str, list]): """ diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 31a64d00ccb60..2d7e18da1cf18 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -405,6 +405,7 @@ def __init__( # last thing are the plugins which override whatever the trainer used by default self.plugin_connector.on_trainer_init(plugins) + print('ClusterEnvironment', self.plugin_connector.cluster_environment) # Callback system self.on_init_end() @@ -434,6 +435,15 @@ def fit( # bookkeeping self._state = TrainerState.RUNNING + # ---------------------------- + # SET UP TRAINING + # ---------------------------- + self.accelerator_backend = self.accelerator_connector.select_accelerator() + self.accelerator_backend.setup(model) + self.local_rank = self.accelerator_backend.cluster_environment.local_rank() + self.node_rank = self.accelerator_backend.cluster_environment.node_rank() + self.global_rank = self.accelerator_backend.cluster_environment.global_rank() + # ---------------------------- # LINK DATA # ---------------------------- @@ -447,12 +457,6 @@ def fit( # we reuse fit in .test() but change its behavior using this flag self.testing = os.environ.get('PL_TESTING_MODE', self.testing) - # ---------------------------- - # SET UP TRAINING - # ---------------------------- - self.accelerator_backend = self.accelerator_connector.select_accelerator() - self.accelerator_backend.setup(model) - # ---------------------------- # INSPECT THESE FOR MAIN LOOPS # ---------------------------- From b53d153200053ebfeb33b7f2e9fa074c870cc3bc Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 17:38:28 -0500 Subject: [PATCH 10/34] add additional methods to ClusterEnvironments --- .../cluster_environments/cluster_environment.py | 6 ++++++ pytorch_lightning/cluster_environments/slurm_environment.py | 3 +++ .../cluster_environments/torchelastic_environment.py | 3 +++ 3 files changed, 12 insertions(+) diff --git a/pytorch_lightning/cluster_environments/cluster_environment.py b/pytorch_lightning/cluster_environments/cluster_environment.py index 5196e44411082..e82b1c310d2bd 100644 --- a/pytorch_lightning/cluster_environments/cluster_environment.py +++ b/pytorch_lightning/cluster_environments/cluster_environment.py @@ -31,3 +31,9 @@ def world_size(self): def local_rank(self): pass + + def global_rank(self): + pass + + def node_rank(self): + pass diff --git a/pytorch_lightning/cluster_environments/slurm_environment.py b/pytorch_lightning/cluster_environments/slurm_environment.py index daec768717185..729190965b46b 100644 --- a/pytorch_lightning/cluster_environments/slurm_environment.py +++ b/pytorch_lightning/cluster_environments/slurm_environment.py @@ -81,3 +81,6 @@ def _resolve_root_node_address(self, root_node): def node_rank(self): return int(os.environ['SLURM_NODEID']) + + def global_rank(self): + return int(os.environ['SLURM_PROCID']) diff --git a/pytorch_lightning/cluster_environments/torchelastic_environment.py b/pytorch_lightning/cluster_environments/torchelastic_environment.py index 868653f1ad304..2fcd0b32f7aa1 100644 --- a/pytorch_lightning/cluster_environments/torchelastic_environment.py +++ b/pytorch_lightning/cluster_environments/torchelastic_environment.py @@ -52,3 +52,6 @@ def local_rank(self): def node_rank(self): return int(os.environ.get('GROUP_RANK')) + + def global_rank(self): + return int(os.environ['RANK']) From 0b6edfe1da002ee943a7e52c9545f3a639e2b24a Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 18:04:21 -0500 Subject: [PATCH 11/34] add NVIDIA mixin for setting up CUDA envars --- .../accelerators/ddp_hpc_accelerator.py | 15 +++++----- .../accelerators/ddp_spawn_accelerator.py | 7 ++++- .../accelerators/gpu_accelerator.py | 7 ++++- .../accelerators/nvidia_mixin.py | 30 +++++++++++++++++++ pytorch_lightning/trainer/trainer.py | 3 +- 5 files changed, 50 insertions(+), 12 deletions(-) create mode 100644 pytorch_lightning/accelerators/nvidia_mixin.py diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index eba548d278c05..188fb5a68b2c6 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -21,6 +21,7 @@ from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp +from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.distributed.dist import LightningDistributed @@ -35,7 +36,7 @@ import sys -class DDPHPCAccelerator(Accelerator): +class DDPHPCAccelerator(Accelerator, NVIDIAMixin): def __init__(self, trainer, @@ -58,23 +59,21 @@ def __init__(self, def setup(self, model): self.trainer.model = model + # ---------------------------- + # NVIDIA FLAGS + # ---------------------------- + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) self.task_idx = self.cluster_environment.local_rank() + def train(self): model = self.trainer.model self.ddp_train(process_idx=self.task_idx, model=model) def set_world_ranks(self, process_idx): - rank = os.environ['JSM_NAMESPACE_RANK'] - size = os.environ['JSM_NAMESPACE_SIZE'] - - rank_id = '%s/%s' % (rank, size) - print("set_world_ranks", rank_id, "process_idx=%s, trainer.node_rank=%s, trainer.num_processes=%s, trainer.num_nodes=%s" % (process_idx, self.trainer.node_rank, self.trainer.num_processes, self.trainer.num_nodes), file=sys.stderr) self.trainer.local_rank = process_idx - #self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx self.trainer.global_rank = self.cluster_environment.node_rank() * self.trainer.num_processes + process_idx self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - print("set_world_ranks", rank_id, "trainer.global_rank=%s, trainer.world_size=%s" % (self.trainer.global_rank, self.trainer.world_size), file=sys.stderr) def init_device(self, process_idx): self.trainer.root_gpu = process_idx diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index a49e17fc0b31d..5c8e0d50d29df 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -22,6 +22,7 @@ from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp +from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.distributed import LightningDistributed @@ -44,7 +45,7 @@ from hydra.utils import get_original_cwd, to_absolute_path -class DDPSpawnAccelerator(Accelerator): +class DDPSpawnAccelerator(Accelerator, NVIDIAMixin): def __init__(self, trainer, @@ -68,6 +69,10 @@ def __init__(self, def setup(self, model): os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', str(find_free_network_port())) + # ---------------------------- + # NVIDIA FLAGS + # ---------------------------- + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) # pass in a state q smp = mp.get_context('spawn') diff --git a/pytorch_lightning/accelerators/gpu_accelerator.py b/pytorch_lightning/accelerators/gpu_accelerator.py index 1310777e0d890..e101b4367f976 100644 --- a/pytorch_lightning/accelerators/gpu_accelerator.py +++ b/pytorch_lightning/accelerators/gpu_accelerator.py @@ -17,12 +17,13 @@ from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp +from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.utilities import AMPType -class GPUAccelerator(Accelerator): +class GPUAccelerator(Accelerator, NVIDIAMixin): amp_backend: AMPType def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None): @@ -40,6 +41,10 @@ def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = self.nickname = None def setup(self, model): + # ---------------------------- + # NVIDIA FLAGS + # ---------------------------- + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) # call setup self.trainer.call_setup_hook(model) diff --git a/pytorch_lightning/accelerators/nvidia_mixin.py b/pytorch_lightning/accelerators/nvidia_mixin.py new file mode 100644 index 0000000000000..4530a182833a8 --- /dev/null +++ b/pytorch_lightning/accelerators/nvidia_mixin.py @@ -0,0 +1,30 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License +import os + +import torch + +from pytorch_lightning import _logger as log + +class NVIDIAMixin: + + def set_nvidia_flags(self, data_parallel_device_ids): + if data_parallel_device_ids is None: + return + + # set the correct cuda visible devices (using pci order) + os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) + devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) + log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 6247945efa232..39a5129a1077b 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -405,7 +405,6 @@ def __init__( # last thing are the plugins which override whatever the trainer used by default self.plugin_connector.on_trainer_init(plugins) - print('ClusterEnvironment', self.plugin_connector.cluster_environment) # Callback system self.on_init_end() @@ -439,10 +438,10 @@ def fit( # SET UP TRAINING # ---------------------------- self.accelerator_backend = self.accelerator_connector.select_accelerator() - self.accelerator_backend.setup(model) self.local_rank = self.accelerator_backend.cluster_environment.local_rank() self.node_rank = self.accelerator_backend.cluster_environment.node_rank() self.global_rank = self.accelerator_backend.cluster_environment.global_rank() + self.accelerator_backend.setup(model) # ---------------------------- # LINK DATA From f7d87f6da311cda1656217fd71f2941649cc85d4 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 19:49:21 -0500 Subject: [PATCH 12/34] remove troubleshooting prints --- .../accelerators/accelerator_connector.py | 161 +----------------- pytorch_lightning/plugins/ddp_plugin.py | 24 --- 2 files changed, 3 insertions(+), 182 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index 5f2399f8c52c6..01e82203bd0ae 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -122,17 +122,6 @@ def on_trainer_init( self.trainer.world_size = 1 self.trainer.interactive_ddp_procs = [] - # link up SLURM - # TODO: this should be taken out of here... but depends too much on DDP - #self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes) - ##self.trainer.is_slurm_managing_tasks = True - #self.trainer.node_rank = self.determine_ddp_node_rank() - #self.trainer.local_rank = self.determine_local_rank() - #self.trainer.global_rank = 0 - - # NVIDIA setup - self.set_nvidia_flags(self.trainer.data_parallel_device_ids) - self.trainer.on_colab_kaggle = os.getenv('COLAB_GPU') or os.getenv('KAGGLE_URL_BASE') self.trainer.replace_sampler_ddp = replace_sampler_ddp @@ -155,11 +144,6 @@ def _map_deprecated_dist_backend(self, accelerator, distributed_backend): def _select_environment(self): if self.trainer.plugin_connector.cluster_environment: env = self.trainer.plugin_connector.cluster_environment - print("RETURNING self.trainer.plugin_connector.cluster_environment", env) - #elif self.trainer.is_slurm_managing_tasks: - # env = SLURMEnvironment() - #elif self._is_using_torchelastic(): - # env = TorchElasticEnvironment() else: env = TorchElasticEnvironment() return env @@ -185,25 +169,6 @@ def select_accelerator(self): # ---------------------------------- # choose an accelerator for the user # ---------------------------------- - #use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks - - # torchelastic or general non_slurm ddp - #te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ) - #use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed - - #use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn" - #use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu" - - #use_ddp_hpc = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_hpc" - - #use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic() - #use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks - - # ddp script mode uses the same flags as TE - # TODO: decouple from TE - #if os.environ.get('PL_IN_DDP_SUBPROCESS', False): - # use_torchelastic_ddp = False - cluster_env = self._select_environment() if self.trainer.use_ddp2: @@ -227,9 +192,9 @@ def select_accelerator(self): elif self.trainer.distributed_backend is None: accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) - elif self.trainer.use_ddp: #self.trainer.distributed_backend in ('ddp', 'ddp_spawn'): + elif self.trainer.use_ddp: spawn = self.trainer.distributed_backend == "ddp_spawn" - use_cpu = not self.trainer.gpus is None + use_cpu = self.trainer.gpus is None acc_args = [self.trainer] acc_kwargs = {'cluster_environment': cluster_env, 'ddp_plugin': self.trainer.plugin_connector.ddp_plugin} @@ -252,98 +217,6 @@ def select_accelerator(self): f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend' ) - ## choose the appropriate accelerator backend - #if self.trainer.use_ddp2: # Done - # accelerator_backend = accelerators.DDP2Accelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_ddp_cpu_slurm: # Done - # accelerator_backend = accelerators.DDPCPUHPCAccelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_slurm_ddp: # Done - # accelerator_backend = accelerators.DDPHPCAccelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_ddp_cpu_torch_elastic: # Done - # accelerator_backend = accelerators.DDPCPUHPCAccelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_torchelastic_ddp: # Done - # accelerator_backend = accelerators.DDPHPCAccelerator( - # self.trainer, - # cluster_env, - # self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_ddp_spawn: # Done - # accelerator_backend = accelerators.DDPSpawnAccelerator( - # self.trainer, - # nprocs=self.trainer.num_processes, - # cluster_environment=cluster_env, - # ddp_plugin=self.trainer.plugin_connector.ddp_plugin - # ) - - #elif use_ddp_cpu_spawn: # Done - # accelerator_backend = accelerators.DDPCPUSpawnAccelerator( - # self.trainer, - # nprocs=self.trainer.num_processes, - # cluster_environment=cluster_env, - # ddp_plugin=self.trainer.plugin_connector.ddp_plugin - # ) - - #elif self.trainer.distributed_backend == "ddp": - # accelerator_backend = accelerators.DDPAccelerator( - # self.trainer, - # cluster_env, - # ddp_plugin=self.trainer.plugin_connector.ddp_plugin - # ) - - #elif self.trainer.distributed_backend == "ddp_hpc": # Done - # print("USING accelerators.DDPHPCAccelerator", file=sys.stderr) - # accelerator_backend = accelerators.DDPHPCAccelerator( - # self.trainer, - # cluster_env, - # ddp_plugin=self.trainer.plugin_connector.ddp_plugin - # ) - - #elif self.trainer.use_dp: - # accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) - - #elif self.trainer.use_horovod: - # accelerator_backend = accelerators.HorovodAccelerator(self.trainer, cluster_env) - - #elif self.trainer.use_single_gpu: - # accelerator_backend = accelerators.GPUAccelerator(self.trainer, cluster_env) - - #elif self.trainer.use_tpu: - # accelerator_backend = accelerators.TPUAccelerator(self.trainer, cluster_env) - - #elif self.trainer.distributed_backend is None: - # accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) - #else: - # raise MisconfigurationException( - # f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend' - # ) - - rank = os.environ['JSM_NAMESPACE_RANK'] - size = os.environ['JSM_NAMESPACE_SIZE'] - - rank_id = '%s/%s' % (rank, size) - print('select_accelerator %s' % rank_id, "using acclerator type %s, cluster environment type %s" % (type(accelerator_backend), type(cluster_env)), file=sys.stderr) - return accelerator_backend def set_distributed_mode(self): @@ -377,7 +250,7 @@ def set_distributed_mode(self): elif self.trainer.num_gpus > 1: self.trainer.use_dp = True - elif self.trainer.distributed_backend in ("ddp", "ddp_spawn", "ddp_hpc"): + elif self.trainer.distributed_backend in ("ddp", "ddp_spawn"): if self.trainer.num_gpus == 0: if self.trainer.num_nodes > 1 or self.trainer.num_processes > 1: self.trainer.use_ddp = True # ddp_cpu @@ -455,31 +328,3 @@ def set_nvidia_flags(self, data_parallel_device_ids): os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) - #log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') - - def determine_local_rank(self): - rank_id = "RANK_ID-%s/%s" % (os.environ['JSM_NAMESPACE_RANK'], os.environ['JSM_NAMESPACE_SIZE']) - print("determine_local_rank", rank_id, "is_slurm_managing_tasks =", self.trainer.is_slurm_managing_tasks) - if self.trainer.is_slurm_managing_tasks: - return int(os.environ['SLURM_LOCALID']) - return int(os.environ.get('LOCAL_RANK', 0)) - - def determine_ddp_node_rank(self): - rank_id = "RANK_ID-%s/%s" % (os.environ['JSM_NAMESPACE_RANK'], os.environ['JSM_NAMESPACE_SIZE']) - print("determine_ddp_node_rank", rank_id, "is_slurm_managing_tasks =", self.trainer.is_slurm_managing_tasks) - - if self.trainer.is_slurm_managing_tasks: - return int(os.environ['SLURM_NODEID']) - - # torchelastic uses the envvar GROUP_RANK, whereas other systems(?) use NODE_RANK. - # otherwise use given node rank or default to node rank 0 - env_vars = ['NODE_RANK', 'GROUP_RANK'] - node_ids = [(k, os.environ.get(k, None)) for k in env_vars] - node_ids = [(k, v) for k, v in node_ids if v is not None] - if len(node_ids) == 0: - return 0 - if len(node_ids) > 1: - log.warning(f"Multiple environment variables ({node_ids}) defined for node rank. Using the first one.") - k, rank = node_ids.pop() - rank_zero_info(f"Using environment variable {k} for node rank ({rank}).") - return int(rank) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 22f637f6aa7d9..7e40d65b912f1 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -64,11 +64,6 @@ def configure_ddp(self, model, device_ids): self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get( "find_unused_parameters", True ) - rank = os.environ['JSM_NAMESPACE_RANK'] - size = os.environ['JSM_NAMESPACE_SIZE'] - - rank_id = '%s/%s' % (rank, size) - print("LightningDistributedDataParallel", rank_id, device_ids, self._ddp_kwargs, file=sys.stderr) model = LightningDistributedDataParallel( model, device_ids=device_ids, @@ -89,32 +84,13 @@ def init_ddp_connection( torch_backend = "nccl" if trainer.on_gpu else "gloo" - rank = os.environ['JSM_NAMESPACE_RANK'] - size = os.environ['JSM_NAMESPACE_SIZE'] - - msg = dict( - NCCL_SOCKET_IFNAME = os.environ.get('NCCL_SOCKET_IFNAME', "NO_IFNAME"), - MASTER_ADDR = os.environ["MASTER_ADDR"], - MASTER_PORT = os.environ["MASTER_PORT"], - WORLD_SIZE = os.environ["WORLD_SIZE"] - ) - ipg = dict(backend=torch_backend, rank=global_rank, world_size=world_size) - - rank_id = '%s/%s' % (rank, size) - - os.environ['NCCL_SOCKET_IFNAME'] = 'ib0' - if not torch_distrib.is_initialized(): log.info( f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}" ) - print("init_ddp_connection INITIALIZING %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) torch_distrib.init_process_group( torch_backend, rank=global_rank, world_size=world_size ) - print("init_ddp_connection FINISHED INITIALIZING %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) - else: - print("init_ddp_connection ALREADY INITIALIZED %s %s %s" % (rank_id, str(ipg), str(msg)), file=sys.stderr) def on_before_forward(self, model: LightningModule, *args): """ From 3c9edf963bd82341e51298de7f4c620dfbbd35eb Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 20:24:58 -0500 Subject: [PATCH 13/34] cleanup SLURMEnvironment --- .../cluster_environments/slurm_environment.py | 191 +++++++++++++----- 1 file changed, 140 insertions(+), 51 deletions(-) diff --git a/pytorch_lightning/cluster_environments/slurm_environment.py b/pytorch_lightning/cluster_environments/slurm_environment.py index 729190965b46b..4af04b0142185 100644 --- a/pytorch_lightning/cluster_environments/slurm_environment.py +++ b/pytorch_lightning/cluster_environments/slurm_environment.py @@ -14,73 +14,162 @@ import os import re +import socket +import warnings from pytorch_lightning import _logger as log from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment class SLURMEnvironment(ClusterEnvironment): + """An environment for running on clusters managed by the LSF resource manager. - def master_address(self): - # figure out the root node addr - try: - root_node = os.environ["SLURM_NODELIST"].split(" ")[0] - except Exception: - root_node = "127.0.0.1" - - root_node = self._resolve_root_node_address(root_node) - os.environ["MASTER_ADDR"] = root_node - log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") - return root_node + It is expected that any execution using this ClusterEnvironment was executed + using the srun - def master_port(self): - # ----------------------- - # SLURM JOB = PORT number - # ----------------------- - # this way every process knows what port to use - try: - # use the last 4 numbers in the job id as the id - default_port = os.environ["SLURM_JOB_ID"] - default_port = default_port[-4:] - - # all ports should be in the 10k+ range - default_port = int(default_port) + 10000 - - except Exception: - default_port = 12910 - - # ----------------------- - # PORT NUMBER = MASTER_PORT - # ----------------------- - # in case the user passed it in - try: - default_port = os.environ["MASTER_PORT"] - except Exception: - os.environ["MASTER_PORT"] = str(default_port) + This plugin expects the following environment variables: + + SLURM_JOB_ID + The Slurm assigned job ID + + SLURM_NODELIST + The hosts used in the job. This string is expected to have the format " ...." + + SLURM_LOCALID + The node local rank for the task. + + SLURM_PROCID + The MPI rank or relative process ID + + SLURM_STEP_NUM_TASKS + The world size for the job. This environment variable is set by srun + """ + + def __init__(self): + self._master_address = self._get_master_address() + self._master_port = self._get_master_port() + self._local_rank = self._get_local_rank() + self._global_rank = self._get_global_rank() + self._world_size = self._get_world_size() + self._node_rank = self._get_node_rank() + # set environment variables needed for initializing torch distributed process group + os.environ["MASTER_ADDR"] = str(self._master_address) + log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") + os.environ["MASTER_PORT"] = str(self._master_port) log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") - return default_port + def _read_hosts(self): + var = "SLURM_NODELIST" + hosts = os.environ.get(var) + if not hosts: + raise ValueError("Could not find hosts -- expected in environment variable %s" % var) + hosts = hosts.split() + return hosts + + def _get_master_address(self): + """A helper for getting the master address""" + hosts = self._read_hosts() + return hosts[0] + + def _get_master_port(self): + """A helper for getting the master port + + Use the Slurm job ID so all ranks can compute the master port + """ + # check for user-specified master port + port = os.environ.get("MASTER_PORT") + if not port: + var = "SLURM_JOB_ID" + jobid = os.environ.get(var) + if not jobid: + raise ValueError("Could not find job id -- expected in environment variable %s" % var) + else: + port = int(jobid) + # all ports should be in the 10k+ range + port = int(port) % 1000 + 10000 + log.debug("calculated master port") + else: + log.debug("using externally specified master port") + return port + + def _get_global_rank(self): + """A helper function for getting the global rank + + Read this from the environment variable SLURM_PROCID + """ + var = "SLURM_PROCID" + global_rank = os.environ.get(var) + if global_rank is None: + raise ValueError("Cannot determine global rank -- expected in %s " % var) + return int(global_rank) + + def _get_local_rank(self): + """A helper function for getting the local rank + + Read this from the environment variable SLURM_LOCALID + """ + var = "SLURM_LOCALID" + local_rank = os.environ.get(var) + if local_rank is None: + raise ValueError("Cannot determine local rank -- expected in %s " % var) + return int(local_rank) + + def _get_world_size(self): + """A helper function for getting the world size + + Read this from the environment variable SLURM_STEP_NUM_TASKS + """ + var = "SLURM_STEP_NUM_TASKS" + world_size = os.environ.get(var) + if world_size is None: + raise ValueError("Cannot determine world size -- expected in %s " + "-- make sure you run your executable with srun" % var) + return int(world_size) + + def _get_node_rank(self): + """A helper function for getting the node rank + + Read this from the environment variable SLURM_NODEID + """ + var = "SLURM_NODEID" + local_rank = os.environ.get(var) + if local_rank is None: + raise ValueError("Cannot determine node rank -- expected in %s " % var) + return int(local_rank) + + def master_address(self): + """ + Master address is read from a list of hosts contained in the environment variable *SLURM_NODELIST* + """ + return self._master_address + + def master_port(self): + """ + Master port is calculated from the Slurm job ID + """ + return self._master_port def world_size(self): + """ + World size is read from the environment variable SLURM_STEP_NUM_TASKS + """ return self._world_size def local_rank(self): - return int(os.environ['SLURM_LOCALID']) - - def _resolve_root_node_address(self, root_node): - if '[' in root_node: - name, numbers = root_node.split('[', maxsplit=1) - number = numbers.split(',', maxsplit=1)[0] - if '-' in number: - number = number.split('-')[0] - - number = re.sub('[^0-9]', '', number) - root_node = name + number - - return root_node + """ + World size is read from the environment variable SLURM_LOCALID + """ + return self._local_rank def node_rank(self): - return int(os.environ['SLURM_NODEID']) + """ + Node rank is read from the environment variable SLURM_NODEID + """ + return self._node_rank def global_rank(self): - return int(os.environ['SLURM_PROCID']) + """ + World size is read from the environment variable SLURM_PROCID + """ + return self._global_rank + From 77f3b71295e7e217ee95597c301dc69fd52ad61c Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 20:25:24 -0500 Subject: [PATCH 14/34] fix docstring --- pytorch_lightning/cluster_environments/lsf_environment.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/cluster_environments/lsf_environment.py index e95547fa72813..378de73d1334d 100644 --- a/pytorch_lightning/cluster_environments/lsf_environment.py +++ b/pytorch_lightning/cluster_environments/lsf_environment.py @@ -129,6 +129,7 @@ def _get_world_size(self): return int(world_size) def _get_node_rank(self): + """A helper function for getting the node rank""" hosts = self._read_hosts() count = dict() for host in hosts: @@ -170,7 +171,7 @@ def node_rank(self): def global_rank(self): """ - World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK + World size is read from the environment variable JSM_NAMESPACE_RANK """ return self._global_rank From eb7d07ca61ff339fc1fa2df4ab34607b6e32166b Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 20:50:38 -0500 Subject: [PATCH 15/34] cleanup TorchElasticEnvironment and add documentation --- .../torchelastic_environment.py | 74 +++++++++++++++++-- 1 file changed, 67 insertions(+), 7 deletions(-) diff --git a/pytorch_lightning/cluster_environments/torchelastic_environment.py b/pytorch_lightning/cluster_environments/torchelastic_environment.py index 2fcd0b32f7aa1..45fbd41a57c55 100644 --- a/pytorch_lightning/cluster_environments/torchelastic_environment.py +++ b/pytorch_lightning/cluster_environments/torchelastic_environment.py @@ -19,11 +19,55 @@ class TorchElasticEnvironment(ClusterEnvironment): + """An environment for running in an environment managed by Torch Elastic + + This ClusterEnvironment expects that it was invoked from within a job + started with the Elastic Launcher. + + This plugin expects the following environment variables: + + MASTER_ADDR + fqdn of the host that is running worker with rank 0 + + MASTER_PORT + port on the MASTER_ADDR that can be used to host the tcp c10d store + + WORLD_SIZE + total number of workers in the job + + GROUP_RANK + rank of the worker group + + RANK + rank of the worker within a worker group + + LOCAL_RANK + rank of the worker within a local worker group + + See `Elastic Launch ` for more details. + """ + + + def _read_required(self, envar, target): + """A helper for reading required environment variables""" + ret = os.environ.get(envar) + if ret is None: + raise ValueError("Could not find %s -- expected in environment variable %s" % (target, envar)) + return ret def __init__(self): - super().__init__() + self._world_size = self._read_required('WORLD_SIZE', 'world size') + self._local_rank = self._read_required('LOCAL_RANK', 'local rank') + self._node_rank = self._read_required('GROUP_RANK', 'node rank') + self._global_rank = self._read_required('RANK', 'global rank') + self._master_address = self._get_master_address() + self._master_port = self._get_master_port() - def master_address(self): + def _get_master_address(self): + """A helper for reading MASTER_ADDR environment variable + + If not MASTER_POR is not found, returns 127.0.0.1 + """ if "MASTER_ADDR" not in os.environ: rank_zero_warn( "MASTER_ADDR environment variable is not defined. Set as localhost" @@ -33,7 +77,11 @@ def master_address(self): master_address = os.environ.get('MASTER_ADDR') return master_address - def master_port(self): + def _get_master_port(self): + """A helper for reading MASTER_PORT environment variable + + If not MASTER_POR is not found, returns 12910 + """ if "MASTER_PORT" not in os.environ: rank_zero_warn( "MASTER_PORT environment variable is not defined. Set as 12910" @@ -44,14 +92,26 @@ def master_port(self): port = os.environ.get('MASTER_PORT') return port + def master_address(self): + """Read from environment variable MASTER_ADDR""" + return self._master_address + + def master_port(self): + """Read from environment variable MASTER_PORT""" + return self._master_port + def world_size(self): - return int(os.environ.get('WORLD_SIZE')) + """Read from environment variable WORLD_SIZE""" + return self._world_size def local_rank(self): - return int(os.environ.get('LOCAL_RANK')) + """Read from environment variable LOCAL_RANK""" + return self._local_rank def node_rank(self): - return int(os.environ.get('GROUP_RANK')) + """Read from environment variable GROUP_RANK""" + return self._node_rank def global_rank(self): - return int(os.environ['RANK']) + """Read from environment variable RANK""" + return self._global_rank From 09064e14492db41a466549c7760a2730e51c41d2 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Fri, 11 Dec 2020 21:01:58 -0500 Subject: [PATCH 16/34] PEP8 puts a cork in it --- pytorch_lightning/accelerators/accelerator_connector.py | 4 ++-- pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 2 -- pytorch_lightning/accelerators/nvidia_mixin.py | 1 + pytorch_lightning/cluster_environments/lsf_environment.py | 1 - pytorch_lightning/cluster_environments/slurm_environment.py | 1 - .../cluster_environments/torchelastic_environment.py | 1 - 6 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index 01e82203bd0ae..5b0842ae46e0b 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -202,13 +202,13 @@ def select_accelerator(self): if use_cpu: if spawn: ddp_cls = accelerators.DDPCPUSpawnAccelerator - ddp_kwargs['nprocs'] = self.trainer.num_processes + acc_kwargs['nprocs'] = self.trainer.num_processes else: ddp_cls = accelerators.DDPCPUHPCAccelerator else: if spawn: ddp_cls = accelerators.DDPSpawnAccelerator - ddp_kwargs['nprocs'] = self.trainer.num_processes + acc_kwargs['nprocs'] = self.trainer.num_processes else: ddp_cls = accelerators.DDPHPCAccelerator accelerator_backend = ddp_cls(*acc_args, **acc_kwargs) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 188fb5a68b2c6..9ee22422cbe7a 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -35,7 +35,6 @@ from hydra.utils import get_original_cwd, to_absolute_path -import sys class DDPHPCAccelerator(Accelerator, NVIDIAMixin): def __init__(self, @@ -65,7 +64,6 @@ def setup(self, model): self.set_nvidia_flags(self.trainer.data_parallel_device_ids) self.task_idx = self.cluster_environment.local_rank() - def train(self): model = self.trainer.model self.ddp_train(process_idx=self.task_idx, model=model) diff --git a/pytorch_lightning/accelerators/nvidia_mixin.py b/pytorch_lightning/accelerators/nvidia_mixin.py index 4530a182833a8..3d345bffde1d2 100644 --- a/pytorch_lightning/accelerators/nvidia_mixin.py +++ b/pytorch_lightning/accelerators/nvidia_mixin.py @@ -17,6 +17,7 @@ from pytorch_lightning import _logger as log + class NVIDIAMixin: def set_nvidia_flags(self, data_parallel_device_ids): diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/cluster_environments/lsf_environment.py index 378de73d1334d..a949e6ffd3562 100644 --- a/pytorch_lightning/cluster_environments/lsf_environment.py +++ b/pytorch_lightning/cluster_environments/lsf_environment.py @@ -174,4 +174,3 @@ def global_rank(self): World size is read from the environment variable JSM_NAMESPACE_RANK """ return self._global_rank - diff --git a/pytorch_lightning/cluster_environments/slurm_environment.py b/pytorch_lightning/cluster_environments/slurm_environment.py index 4af04b0142185..bf9bb1be12acc 100644 --- a/pytorch_lightning/cluster_environments/slurm_environment.py +++ b/pytorch_lightning/cluster_environments/slurm_environment.py @@ -172,4 +172,3 @@ def global_rank(self): World size is read from the environment variable SLURM_PROCID """ return self._global_rank - diff --git a/pytorch_lightning/cluster_environments/torchelastic_environment.py b/pytorch_lightning/cluster_environments/torchelastic_environment.py index 45fbd41a57c55..220bed9495c50 100644 --- a/pytorch_lightning/cluster_environments/torchelastic_environment.py +++ b/pytorch_lightning/cluster_environments/torchelastic_environment.py @@ -47,7 +47,6 @@ class TorchElasticEnvironment(ClusterEnvironment): See `Elastic Launch ` for more details. """ - def _read_required(self, envar, target): """A helper for reading required environment variables""" ret = os.environ.get(envar) From 7be8f1d3de5b1bc83e2010167189e1421ca36383 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 11 Feb 2021 17:31:43 -0500 Subject: [PATCH 17/34] add set_ranks_to_trainer --- .../cluster_environments/cluster_environment.py | 5 +++++ pytorch_lightning/trainer/trainer.py | 4 +--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pytorch_lightning/cluster_environments/cluster_environment.py b/pytorch_lightning/cluster_environments/cluster_environment.py index e82b1c310d2bd..3068558933731 100644 --- a/pytorch_lightning/cluster_environments/cluster_environment.py +++ b/pytorch_lightning/cluster_environments/cluster_environment.py @@ -37,3 +37,8 @@ def global_rank(self): def node_rank(self): pass + + def set_ranks_to_trainer(self): + trainer.local_rank = self.accelerator_backend.cluster_environment.local_rank() + trainer.node_rank = self.accelerator_backend.cluster_environment.node_rank() + trainer.global_rank = self.accelerator_backend.cluster_environment.global_rank() diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 39a5129a1077b..2c60bdecae00f 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -438,9 +438,7 @@ def fit( # SET UP TRAINING # ---------------------------- self.accelerator_backend = self.accelerator_connector.select_accelerator() - self.local_rank = self.accelerator_backend.cluster_environment.local_rank() - self.node_rank = self.accelerator_backend.cluster_environment.node_rank() - self.global_rank = self.accelerator_backend.cluster_environment.global_rank() + self.accelerator_backend.cluster_environment.set_ranks_to_trainer(self) self.accelerator_backend.setup(model) # ---------------------------- From a1132103c2636ddb66f1a93fc71ec2d9294b22e4 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 11 Feb 2021 21:06:21 -0500 Subject: [PATCH 18/34] remove unused import --- pytorch_lightning/accelerators/accelerator_connector.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index 62c816445c9f4..9ac82621ba482 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -13,7 +13,6 @@ # limitations under the License. import os -import sys import torch @@ -431,7 +430,7 @@ def check_horovod(self): @staticmethod def has_horovodrun(): """Returns True if running with `horovodrun` using Gloo or OpenMPI.""" - return 'OMPI_COMM_WORLD_RANK' in os.environ or 'HOROVOD_RANK' in os.environ + return "OMPI_COMM_WORLD_RANK" in os.environ or "HOROVOD_RANK" in os.environ def configure_slurm_ddp(self): # extract SLURM flag vars From d17281cdaf31f9a26d147a0ccdf8ab5f3dde2515 Mon Sep 17 00:00:00 2001 From: Andrew Tritt Date: Thu, 11 Feb 2021 21:08:22 -0500 Subject: [PATCH 19/34] move to new location --- .../environments}/lsf_environment.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pytorch_lightning/{cluster_environments => plugins/environments}/lsf_environment.py (100%) diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/plugins/environments/lsf_environment.py similarity index 100% rename from pytorch_lightning/cluster_environments/lsf_environment.py rename to pytorch_lightning/plugins/environments/lsf_environment.py From 7a233764d3811dcdded76a39c4999ef3733daef2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 11:09:54 +0200 Subject: [PATCH 20/34] update LSF environment --- .../environments/cluster_environment.py | 11 - .../plugins/environments/lsf_environment.py | 204 ++++++++---------- 2 files changed, 89 insertions(+), 126 deletions(-) diff --git a/pytorch_lightning/plugins/environments/cluster_environment.py b/pytorch_lightning/plugins/environments/cluster_environment.py index 960ea5f64c14b..ed6172ae663ce 100644 --- a/pytorch_lightning/plugins/environments/cluster_environment.py +++ b/pytorch_lightning/plugins/environments/cluster_environment.py @@ -56,14 +56,3 @@ def node_rank(self) -> int: def teardown(self) -> None: """ Clean up any state set after execution finishes. """ pass - - def global_rank(self): - pass - - def node_rank(self): - pass - - def set_ranks_to_trainer(self): - trainer.local_rank = self.accelerator_backend.cluster_environment.local_rank() - trainer.node_rank = self.accelerator_backend.cluster_environment.node_rank() - trainer.global_rank = self.accelerator_backend.cluster_environment.global_rank() diff --git a/pytorch_lightning/plugins/environments/lsf_environment.py b/pytorch_lightning/plugins/environments/lsf_environment.py index a949e6ffd3562..71afc3fc42c7c 100644 --- a/pytorch_lightning/plugins/environments/lsf_environment.py +++ b/pytorch_lightning/plugins/environments/lsf_environment.py @@ -13,164 +13,138 @@ # limitations under the License. import os -import re import socket -import warnings from pytorch_lightning import _logger as log -from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment +from pytorch_lightning.plugins.environments import ClusterEnvironment class LSFEnvironment(ClusterEnvironment): - """An environment for running on clusters managed by the LSF resource manager. + """ + An environment for running on clusters managed by the LSF resource manager. It is expected that any execution using this ClusterEnvironment was executed - using the Job Step Manager i.e. jsrun. + using the Job Step Manager i.e. ``jsrun``. - This plugin expects the following environment variables: + This plugin expects the following environment variables. - LSB_JOBID - The LSF assigned job ID + LSB_JOBID: + The LSF assigned job ID - LSB_HOSTS - The hosts used in the job. This string is expected to have the format "batch ...." + LSB_HOSTS: + The hosts used in the job. This string is expected to have the format "batch ...." - JSM_NAMESPACE_LOCAL_RANK - The node local rank for the task. This environment variable is set by jsrun + JSM_NAMESPACE_LOCAL_RANK: + The node local rank for the task. This environment variable is set by jsrun - JSM_NAMESPACE_SIZE - The world size for the task. This environment variable is set by jsrun + JSM_NAMESPACE_SIZE: + The world size for the task. This environment variable is set by jsrun """ def __init__(self): self._master_address = self._get_master_address() self._master_port = self._get_master_port() - self._local_rank = self._get_local_rank() - self._global_rank = self._get_global_rank() - self._world_size = self._get_world_size() - self._node_rank = self._get_node_rank() - - # set environment variables needed for initializing torch distributed process group - os.environ["MASTER_ADDR"] = str(self._master_address) - log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") - os.environ["MASTER_PORT"] = str(self._master_port) - log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") - - def _read_hosts(self): - var = "LSB_HOSTS" - hosts = os.environ.get(var) - if not hosts: - raise ValueError("Could not find hosts -- expected in environment variable %s" % var) - hosts = hosts.split() - if len(hosts) < 2: - raise ValueError("Cannot parse hosts from LSB_HOSTS environment variable -- " - "expected format \"batch ...\"") - return hosts + log.debug(f"MASTER_ADDR: {self._master_address}") + log.debug(f"MASTER_PORT: {self._master_port}") - def _get_master_address(self): - """A helper for getting the master address""" - hosts = self._read_hosts() - return hosts[1] + def creates_children(self) -> bool: + return True - def _get_master_port(self): - """A helper for getting the master port + def master_address(self): + """ The master address is read from a list of hosts contained in the environment variable `LSB_HOSTS`. """ + return self._master_address - Use the LSF job ID so all ranks can compute the master port - """ - # check for user-specified master port - port = os.environ.get("MASTER_PORT") - if not port: - var = "LSB_JOBID" - jobid = os.environ.get(var) - if not jobid: - raise ValueError("Could not find job id -- expected in environment variable %s" % var) - else: - port = int(jobid) - # all ports should be in the 10k+ range - port = int(port) % 1000 + 10000 - log.debug("calculated master port") - else: - log.debug("using externally specified master port") - return port + def master_port(self): + """ THe master port gets calculated from the LSF job ID. """ + return self._master_port - def _get_global_rank(self): - """A helper function for getting the global rank + def world_size(self): + """ The world size is read from the environment variable `JSM_NAMESPACE_SIZE`. """ + var = "JSM_NAMESPACE_SIZE" + world_size = os.environ.get(var) + if world_size is None: + raise ValueError( + f"Cannot determine world size from environment variable {var}." + " Make sure you run your executable with `jsrun`" + ) + return int(world_size) - Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK - """ + def set_world_size(self, size: int) -> None: + log.debug("LSFEnvironment.set_world_size was called, but setting world size is not allowed. Ignored.") + + def global_rank(self): + """ The world size is read from the environment variable `JSM_NAMESPACE_RANK`. """ var = "JSM_NAMESPACE_RANK" global_rank = os.environ.get(var) if global_rank is None: - raise ValueError("Cannot determine global rank -- expected in %s " - "-- make sure you run your executable with jsrun" % var) + raise ValueError( + f"Cannot determine global rank from environment variable {var}." + " Make sure you run your executable with `jsrun`" + ) return int(global_rank) - def _get_local_rank(self): - """A helper function for getting the local rank + def set_global_rank(self, rank: int) -> None: + log.debug( + "LSFEnvironment.set_global_rank was called, but setting global rank is not allowed. Ignored." + ) - Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK - """ + def local_rank(self): + """ The local rank is read from the environment variable `JSM_NAMESPACE_LOCAL_RANK`. """ var = "JSM_NAMESPACE_LOCAL_RANK" local_rank = os.environ.get(var) if local_rank is None: - raise ValueError("Cannot determine local rank -- expected in %s " - "-- make sure you run your executable with jsrun" % var) + raise ValueError( + f"Cannot determine local rank from environment variable {var}." + " Make sure you run your executable with `jsrun`" + ) return int(local_rank) - def _get_world_size(self): - """A helper function for getting the world size - - Read this from the environment variable JSM_NAMESPACE_SIZE + def node_rank(self): + """ + The node rank is determined by the position of the current hostname in the list of hosts stored in + the environment variable `LSB_HOSTS`. """ - var = "JSM_NAMESPACE_SIZE" - world_size = os.environ.get(var) - if world_size is None: - raise ValueError("Cannot determine local rank -- expected in %s " - "-- make sure you run your executable with jsrun" % var) - return int(world_size) - - def _get_node_rank(self): - """A helper function for getting the node rank""" hosts = self._read_hosts() count = dict() for host in hosts: - if 'batch' in host or 'login' in host: + if "batch" in host or "login" in host: continue if host not in count: count[host] = len(count) return count[socket.gethostname()] - def master_address(self): - """ - Master address is read from a list of hosts contained in the environment variable *LSB_HOSTS* - """ - return self._master_address - - def master_port(self): - """ - Master port is calculated from the LSF job ID - """ - return self._master_port - - def world_size(self): - """ - World size is read from the environment variable JSM_NAMESPACE_SIZE - """ - return self._world_size - - def local_rank(self): - """ - World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK - """ - return self._local_rank + @staticmethod + def _read_hosts(): + hosts = os.environ.get("LSB_HOSTS") + if not hosts: + raise ValueError("Could not find hosts in environment variable LSB_HOSTS") + hosts = hosts.split() + if len(hosts) < 2: + raise ValueError( + "Cannot parse hosts from LSB_HOSTS environment variable." + " Expected format: \"batch ...\"" + ) + return hosts - def node_rank(self): - """ - Node rank is determined by the position of the current hostname in the list of hosts stored in LSB_HOSTS - """ - return self._node_rank + def _get_master_address(self): + hosts = self._read_hosts() + return hosts[1] - def global_rank(self): + @staticmethod + def _get_master_port(): """ - World size is read from the environment variable JSM_NAMESPACE_RANK + A helper function for accessing the master port. + Uses the LSF job ID so all ranks can compute the master port. """ - return self._global_rank + # check for user-specified master port + port = os.environ.get("MASTER_PORT") + if not port: + jobid = os.environ.get("LSB_JOBID") + if not jobid: + raise ValueError("Could not find job id in environment variable LSB_JOBID") + port = int(jobid) + # all ports should be in the 10k+ range + port = int(port) % 1000 + 10000 + log.debug(f"calculated LSF master port: {port}") + else: + log.debug(f"using externally specified master port: {port}") + return port From 02410ff794b28e8cfe3cd330571a24a21474d50f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 11:10:42 +0200 Subject: [PATCH 21/34] remove mixin --- .../accelerators/nvidia_mixin.py | 31 ------------------- 1 file changed, 31 deletions(-) delete mode 100644 pytorch_lightning/accelerators/nvidia_mixin.py diff --git a/pytorch_lightning/accelerators/nvidia_mixin.py b/pytorch_lightning/accelerators/nvidia_mixin.py deleted file mode 100644 index 3d345bffde1d2..0000000000000 --- a/pytorch_lightning/accelerators/nvidia_mixin.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License -import os - -import torch - -from pytorch_lightning import _logger as log - - -class NVIDIAMixin: - - def set_nvidia_flags(self, data_parallel_device_ids): - if data_parallel_device_ids is None: - return - - # set the correct cuda visible devices (using pci order) - os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" - all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) - devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) - log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') From 7f9174041c5abdd42595516120314e7eecaa37e2 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 9 Jul 2021 09:11:04 +0000 Subject: [PATCH 22/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pytorch_lightning/plugins/environments/lsf_environment.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pytorch_lightning/plugins/environments/lsf_environment.py b/pytorch_lightning/plugins/environments/lsf_environment.py index 71afc3fc42c7c..01e11f7ffa34e 100644 --- a/pytorch_lightning/plugins/environments/lsf_environment.py +++ b/pytorch_lightning/plugins/environments/lsf_environment.py @@ -14,6 +14,7 @@ import os import socket + from pytorch_lightning import _logger as log from pytorch_lightning.plugins.environments import ClusterEnvironment @@ -83,9 +84,7 @@ def global_rank(self): return int(global_rank) def set_global_rank(self, rank: int) -> None: - log.debug( - "LSFEnvironment.set_global_rank was called, but setting global rank is not allowed. Ignored." - ) + log.debug("LSFEnvironment.set_global_rank was called, but setting global rank is not allowed. Ignored.") def local_rank(self): """ The local rank is read from the environment variable `JSM_NAMESPACE_LOCAL_RANK`. """ From 1b3bc7abe9dd1c4d7b00c9ec4912ccb2ab30ff77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 11:16:26 +0200 Subject: [PATCH 23/34] changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d8dc8647bc45..168ce93c103c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -146,6 +146,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Added `FastForwardSampler` and `CaptureIterableDataset` ([#8307](https://github.com/PyTorchLightning/pytorch-lightning/pull/8307)) +- Added `LSFEnvironment` for distributed training with the LSF resource manager `jsrun` ([#5102](https://github.com/PyTorchLightning/pytorch-lightning/pull/5102)) + + ### Changed From 92215abd837a8cfc2012b5145069f3dd9cefcf27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 11:20:45 +0200 Subject: [PATCH 24/34] reset slurm env --- .../plugins/environments/slurm_environment.py | 46 ++++++------------- 1 file changed, 13 insertions(+), 33 deletions(-) diff --git a/pytorch_lightning/plugins/environments/slurm_environment.py b/pytorch_lightning/plugins/environments/slurm_environment.py index 5ab981b90c91d..359d0f0d408c1 100644 --- a/pytorch_lightning/plugins/environments/slurm_environment.py +++ b/pytorch_lightning/plugins/environments/slurm_environment.py @@ -38,8 +38,7 @@ def master_address(self) -> str: root_node = self.resolve_root_node_address(root_node) os.environ["MASTER_ADDR"] = root_node log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") - os.environ["MASTER_PORT"] = str(self._master_port) - log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") + return root_node def master_port(self) -> int: # ----------------------- @@ -55,34 +54,16 @@ def master_port(self) -> int: else: default_port = 12910 - def _get_master_address(self): - """A helper for getting the master address""" - hosts = self._read_hosts() - return hosts[0] - - def _get_master_port(self): - """A helper for getting the master port - - Use the Slurm job ID so all ranks can compute the master port - """ - # check for user-specified master port - port = os.environ.get("MASTER_PORT") - if not port: - var = "SLURM_JOB_ID" - jobid = os.environ.get(var) - if not jobid: - raise ValueError("Could not find job id -- expected in environment variable %s" % var) - else: - port = int(jobid) - # all ports should be in the 10k+ range - port = int(port) % 1000 + 10000 - log.debug("calculated master port") + # ----------------------- + # PORT NUMBER = MASTER_PORT + # ----------------------- + # in case the user passed it in + if "MASTER_PORT" in os.environ: + default_port = os.environ["MASTER_PORT"] else: - log.debug("using externally specified master port") - return port + os.environ["MASTER_PORT"] = str(default_port) - def _get_global_rank(self): - """A helper function for getting the global rank + log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") return int(default_port) @@ -111,8 +92,7 @@ def resolve_root_node_address(self, root_node: str) -> str: if '-' in number: number = number.split('-')[0] - def global_rank(self): - """ - World size is read from the environment variable SLURM_PROCID - """ - return self._global_rank + number = re.sub('[^0-9]', '', number) + root_node = name + number + + return root_node From a6137591f9ac45e0b8068f2ede7ee2c528b4edd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 13:20:23 +0200 Subject: [PATCH 25/34] add tests --- .../plugins/environments/__init__.py | 1 + .../plugins/environments/lsf_environment.py | 2 +- .../environments/test_lsf_environment.py | 61 +++++++++++++++++++ 3 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 tests/plugins/environments/test_lsf_environment.py diff --git a/pytorch_lightning/plugins/environments/__init__.py b/pytorch_lightning/plugins/environments/__init__.py index c7199ece84e31..1878a725071ad 100644 --- a/pytorch_lightning/plugins/environments/__init__.py +++ b/pytorch_lightning/plugins/environments/__init__.py @@ -14,5 +14,6 @@ from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment # noqa: F401 from pytorch_lightning.plugins.environments.kubeflow_environment import KubeflowEnvironment # noqa: F401 from pytorch_lightning.plugins.environments.lightning_environment import LightningEnvironment # noqa: F401 +from pytorch_lightning.plugins.environments.lsf_environment import LSFEnvironment # noqa: F401 from pytorch_lightning.plugins.environments.slurm_environment import SLURMEnvironment # noqa: F401 from pytorch_lightning.plugins.environments.torchelastic_environment import TorchElasticEnvironment # noqa: F401 diff --git a/pytorch_lightning/plugins/environments/lsf_environment.py b/pytorch_lightning/plugins/environments/lsf_environment.py index 01e11f7ffa34e..c3a4072c3b421 100644 --- a/pytorch_lightning/plugins/environments/lsf_environment.py +++ b/pytorch_lightning/plugins/environments/lsf_environment.py @@ -146,4 +146,4 @@ def _get_master_port(): log.debug(f"calculated LSF master port: {port}") else: log.debug(f"using externally specified master port: {port}") - return port + return int(port) diff --git a/tests/plugins/environments/test_lsf_environment.py b/tests/plugins/environments/test_lsf_environment.py new file mode 100644 index 0000000000000..18e7998fa5ce5 --- /dev/null +++ b/tests/plugins/environments/test_lsf_environment.py @@ -0,0 +1,61 @@ +import os +from unittest import mock + +import pytest + +from pytorch_lightning.plugins.environments import LSFEnvironment + + +@mock.patch.dict(os.environ, { + "LSB_HOSTS": "batch 10.10.10.0 10.10.10.1", + "LSB_JOBID": "1234", +}) +def test_missing_lsb_hosts(): + """ Test an error when the lsb hosts list cannot be found. """ + del os.environ["LSB_HOSTS"] + with pytest.raises(ValueError, match="Could not find hosts in environment variable LSB_HOSTS"): + LSFEnvironment() + + +@mock.patch.dict(os.environ, { + "LSB_HOSTS": "batch 10.10.10.0 10.10.10.1", + "LSB_JOBID": "1234", +}) +def test_missing_lsb_job_id(): + """ Test an error when the job id cannot be found. """ + del os.environ["LSB_JOBID"] + with pytest.raises(ValueError, match="Could not find job id in environment variable LSB_JOBID"): + LSFEnvironment() + + +@mock.patch.dict(os.environ, { + "MASTER_PORT": "4321", + "LSB_JOBID": "1234", + "LSB_HOSTS": "batch 10.10.10.0 10.10.10.1", +}) +def test_manual_master_port_and_address(): + """ Test a user can set the port manually through the MASTER_PORT env variable. """ + env = LSFEnvironment() + assert env.master_port() == 4321 + + +@mock.patch.dict(os.environ, { + "LSB_HOSTS": "batch 10.10.10.0 10.10.10.1 10.10.10.2 10.10.10.3", + "LSB_JOBID": "1234", + "JSM_NAMESPACE_SIZE": "4", + "JSM_NAMESPACE_RANK": "3", + "JSM_NAMESPACE_LOCAL_RANK": "1" +}) +def test_attributes_from_environment_variables(): + """ Test that the LSF environment takes the attributes from the environment variables. """ + env = LSFEnvironment() + assert env.creates_children() + assert env.master_address() == "10.10.10.0" + assert env.master_port() == 10234 + assert env.world_size() == 4 + assert env.global_rank() == 3 + assert env.local_rank() == 1 + env.set_global_rank(100) + assert env.global_rank() == 3 + env.set_world_size(100) + assert env.world_size() == 4 From f7c5e0e0d11666a56518c8cf582b85cd5efb5d74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 13:21:14 +0200 Subject: [PATCH 26/34] add licence --- .../environments/test_kubeflow_environment.py | 13 +++++++++++++ .../environments/test_lightning_environment.py | 13 +++++++++++++ tests/plugins/environments/test_lsf_environment.py | 13 +++++++++++++ .../plugins/environments/test_slurm_environment.py | 13 +++++++++++++ .../environments/test_torchelastic_environment.py | 13 +++++++++++++ 5 files changed, 65 insertions(+) diff --git a/tests/plugins/environments/test_kubeflow_environment.py b/tests/plugins/environments/test_kubeflow_environment.py index b552b8b4c4c28..767e920920103 100644 --- a/tests/plugins/environments/test_kubeflow_environment.py +++ b/tests/plugins/environments/test_kubeflow_environment.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import logging import os from unittest import mock diff --git a/tests/plugins/environments/test_lightning_environment.py b/tests/plugins/environments/test_lightning_environment.py index 8ebcec953fcc8..29917877b2cf5 100644 --- a/tests/plugins/environments/test_lightning_environment.py +++ b/tests/plugins/environments/test_lightning_environment.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import os from unittest import mock diff --git a/tests/plugins/environments/test_lsf_environment.py b/tests/plugins/environments/test_lsf_environment.py index 18e7998fa5ce5..1c4b897713d72 100644 --- a/tests/plugins/environments/test_lsf_environment.py +++ b/tests/plugins/environments/test_lsf_environment.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import os from unittest import mock diff --git a/tests/plugins/environments/test_slurm_environment.py b/tests/plugins/environments/test_slurm_environment.py index 0be88dbeb91c6..da5fef19e49b5 100644 --- a/tests/plugins/environments/test_slurm_environment.py +++ b/tests/plugins/environments/test_slurm_environment.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import logging import os from unittest import mock diff --git a/tests/plugins/environments/test_torchelastic_environment.py b/tests/plugins/environments/test_torchelastic_environment.py index 2b9efafbbcc67..6fee9eb17a6ff 100644 --- a/tests/plugins/environments/test_torchelastic_environment.py +++ b/tests/plugins/environments/test_torchelastic_environment.py @@ -1,3 +1,16 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import logging import os from unittest import mock From 00de88ef61b44fbb21f3e1926771b12d38c9f8e6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 9 Jul 2021 11:22:31 +0000 Subject: [PATCH 27/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../environments/test_lsf_environment.py | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/tests/plugins/environments/test_lsf_environment.py b/tests/plugins/environments/test_lsf_environment.py index 1c4b897713d72..0e2a8db648d28 100644 --- a/tests/plugins/environments/test_lsf_environment.py +++ b/tests/plugins/environments/test_lsf_environment.py @@ -41,24 +41,28 @@ def test_missing_lsb_job_id(): LSFEnvironment() -@mock.patch.dict(os.environ, { - "MASTER_PORT": "4321", - "LSB_JOBID": "1234", - "LSB_HOSTS": "batch 10.10.10.0 10.10.10.1", -}) +@mock.patch.dict( + os.environ, { + "MASTER_PORT": "4321", + "LSB_JOBID": "1234", + "LSB_HOSTS": "batch 10.10.10.0 10.10.10.1", + } +) def test_manual_master_port_and_address(): """ Test a user can set the port manually through the MASTER_PORT env variable. """ env = LSFEnvironment() assert env.master_port() == 4321 -@mock.patch.dict(os.environ, { - "LSB_HOSTS": "batch 10.10.10.0 10.10.10.1 10.10.10.2 10.10.10.3", - "LSB_JOBID": "1234", - "JSM_NAMESPACE_SIZE": "4", - "JSM_NAMESPACE_RANK": "3", - "JSM_NAMESPACE_LOCAL_RANK": "1" -}) +@mock.patch.dict( + os.environ, { + "LSB_HOSTS": "batch 10.10.10.0 10.10.10.1 10.10.10.2 10.10.10.3", + "LSB_JOBID": "1234", + "JSM_NAMESPACE_SIZE": "4", + "JSM_NAMESPACE_RANK": "3", + "JSM_NAMESPACE_LOCAL_RANK": "1" + } +) def test_attributes_from_environment_variables(): """ Test that the LSF environment takes the attributes from the environment variables. """ env = LSFEnvironment() From cfd59b8540622e718bb09221be666b245ff76731 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 13:28:49 +0200 Subject: [PATCH 28/34] test node_rank --- tests/plugins/environments/test_lsf_environment.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/plugins/environments/test_lsf_environment.py b/tests/plugins/environments/test_lsf_environment.py index 0e2a8db648d28..c4c8953b79fd8 100644 --- a/tests/plugins/environments/test_lsf_environment.py +++ b/tests/plugins/environments/test_lsf_environment.py @@ -76,3 +76,15 @@ def test_attributes_from_environment_variables(): assert env.global_rank() == 3 env.set_world_size(100) assert env.world_size() == 4 + + +@mock.patch("socket.gethostname", return_value="host2") +@mock.patch.dict( + os.environ, { + "LSB_HOSTS": "batch host0 host1 host2 host3", + "LSB_JOBID": "1234", + } +) +def test_node_rank(_): + env = LSFEnvironment() + assert env.node_rank() == 2 From 5ec99e946874ef0cfb02187021836e8c1266aefb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 9 Jul 2021 11:30:04 +0000 Subject: [PATCH 29/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/plugins/environments/test_lsf_environment.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/plugins/environments/test_lsf_environment.py b/tests/plugins/environments/test_lsf_environment.py index c4c8953b79fd8..fcb39e7d354ab 100644 --- a/tests/plugins/environments/test_lsf_environment.py +++ b/tests/plugins/environments/test_lsf_environment.py @@ -79,12 +79,10 @@ def test_attributes_from_environment_variables(): @mock.patch("socket.gethostname", return_value="host2") -@mock.patch.dict( - os.environ, { - "LSB_HOSTS": "batch host0 host1 host2 host3", - "LSB_JOBID": "1234", - } -) +@mock.patch.dict(os.environ, { + "LSB_HOSTS": "batch host0 host1 host2 host3", + "LSB_JOBID": "1234", +}) def test_node_rank(_): env = LSFEnvironment() assert env.node_rank() == 2 From cfe544f58b9d9a99f21e6ad3b06c3ac2242e6858 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 13:33:07 +0200 Subject: [PATCH 30/34] add lsf env to docs --- docs/source/extensions/plugins.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/extensions/plugins.rst b/docs/source/extensions/plugins.rst index 436d40f660e7a..173e533fb0882 100644 --- a/docs/source/extensions/plugins.rst +++ b/docs/source/extensions/plugins.rst @@ -148,6 +148,7 @@ Cluster Environments ClusterEnvironment LightningEnvironment + LSFEnvironment TorchElasticEnvironment KubeflowEnvironment SLURMEnvironment From 71569decd97b6fe0e045346eefaa3b21c3f86fbd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 13:38:28 +0200 Subject: [PATCH 31/34] add auto detection for lsf environment --- pytorch_lightning/plugins/environments/lsf_environment.py | 8 ++++++++ .../trainer/connectors/accelerator_connector.py | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/plugins/environments/lsf_environment.py b/pytorch_lightning/plugins/environments/lsf_environment.py index c3a4072c3b421..07f7efc878ea6 100644 --- a/pytorch_lightning/plugins/environments/lsf_environment.py +++ b/pytorch_lightning/plugins/environments/lsf_environment.py @@ -47,6 +47,14 @@ def __init__(self): log.debug(f"MASTER_ADDR: {self._master_address}") log.debug(f"MASTER_PORT: {self._master_port}") + @staticmethod + def is_using_lsf() -> bool: + """ Returns ``True`` if the current process was launched using the jsrun command. """ + required_env_vars = ( + "LSB_JOBID", "LSB_HOSTS", "JSM_NAMESPACE_LOCAL_RANK", "LOCAL_WORLD_SIZE", "JSM_NAMESPACE_SIZE" + ) + return all(v in os.environ for v in required_env_vars) + def creates_children(self) -> bool: return True diff --git a/pytorch_lightning/trainer/connectors/accelerator_connector.py b/pytorch_lightning/trainer/connectors/accelerator_connector.py index 752ad6eeb747e..ab3cb23ddac7e 100644 --- a/pytorch_lightning/trainer/connectors/accelerator_connector.py +++ b/pytorch_lightning/trainer/connectors/accelerator_connector.py @@ -55,7 +55,7 @@ KubeflowEnvironment, LightningEnvironment, SLURMEnvironment, - TorchElasticEnvironment, + TorchElasticEnvironment, LSFEnvironment, ) from pytorch_lightning.utilities import ( _APEX_AVAILABLE, @@ -554,6 +554,8 @@ def select_cluster_environment(self) -> ClusterEnvironment: env = TorchElasticEnvironment() elif KubeflowEnvironment.is_using_kubeflow(): env = KubeflowEnvironment() + elif LSFEnvironment.is_using_lsf(): + env = LSFEnvironment() else: env = LightningEnvironment() return env From 7c26b41d1884d8dd40db845f6a6ccff89cee5d1f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 9 Jul 2021 11:39:45 +0000 Subject: [PATCH 32/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pytorch_lightning/trainer/connectors/accelerator_connector.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/trainer/connectors/accelerator_connector.py b/pytorch_lightning/trainer/connectors/accelerator_connector.py index ab3cb23ddac7e..2a643c64f5e64 100644 --- a/pytorch_lightning/trainer/connectors/accelerator_connector.py +++ b/pytorch_lightning/trainer/connectors/accelerator_connector.py @@ -54,8 +54,9 @@ ClusterEnvironment, KubeflowEnvironment, LightningEnvironment, + LSFEnvironment, SLURMEnvironment, - TorchElasticEnvironment, LSFEnvironment, + TorchElasticEnvironment, ) from pytorch_lightning.utilities import ( _APEX_AVAILABLE, From 077964d0991614b268133f8a2e58c922c200bdc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Fri, 9 Jul 2021 13:41:38 +0200 Subject: [PATCH 33/34] fix is_using_lsf() and test --- pytorch_lightning/plugins/environments/lsf_environment.py | 2 +- tests/plugins/environments/test_lsf_environment.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/plugins/environments/lsf_environment.py b/pytorch_lightning/plugins/environments/lsf_environment.py index 07f7efc878ea6..781e130e42dbc 100644 --- a/pytorch_lightning/plugins/environments/lsf_environment.py +++ b/pytorch_lightning/plugins/environments/lsf_environment.py @@ -51,7 +51,7 @@ def __init__(self): def is_using_lsf() -> bool: """ Returns ``True`` if the current process was launched using the jsrun command. """ required_env_vars = ( - "LSB_JOBID", "LSB_HOSTS", "JSM_NAMESPACE_LOCAL_RANK", "LOCAL_WORLD_SIZE", "JSM_NAMESPACE_SIZE" + "LSB_JOBID", "LSB_HOSTS", "JSM_NAMESPACE_LOCAL_RANK", "JSM_NAMESPACE_SIZE", ) return all(v in os.environ for v in required_env_vars) diff --git a/tests/plugins/environments/test_lsf_environment.py b/tests/plugins/environments/test_lsf_environment.py index fcb39e7d354ab..fd8beec7bb4ac 100644 --- a/tests/plugins/environments/test_lsf_environment.py +++ b/tests/plugins/environments/test_lsf_environment.py @@ -76,6 +76,7 @@ def test_attributes_from_environment_variables(): assert env.global_rank() == 3 env.set_world_size(100) assert env.world_size() == 4 + assert LSFEnvironment.is_using_lsf() @mock.patch("socket.gethostname", return_value="host2") From 7f127c8484d067a4c6ea93ecab9db76ce13a3216 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 9 Jul 2021 11:43:06 +0000 Subject: [PATCH 34/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pytorch_lightning/plugins/environments/lsf_environment.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/plugins/environments/lsf_environment.py b/pytorch_lightning/plugins/environments/lsf_environment.py index 781e130e42dbc..3b32a7b4aeb50 100644 --- a/pytorch_lightning/plugins/environments/lsf_environment.py +++ b/pytorch_lightning/plugins/environments/lsf_environment.py @@ -51,7 +51,10 @@ def __init__(self): def is_using_lsf() -> bool: """ Returns ``True`` if the current process was launched using the jsrun command. """ required_env_vars = ( - "LSB_JOBID", "LSB_HOSTS", "JSM_NAMESPACE_LOCAL_RANK", "JSM_NAMESPACE_SIZE", + "LSB_JOBID", + "LSB_HOSTS", + "JSM_NAMESPACE_LOCAL_RANK", + "JSM_NAMESPACE_SIZE", ) return all(v in os.environ for v in required_env_vars)