diff --git a/docs/source/accelerators.rst b/docs/source/accelerators.rst index bc9abebcd90d8..06e87fa7406da 100644 --- a/docs/source/accelerators.rst +++ b/docs/source/accelerators.rst @@ -180,3 +180,27 @@ TPU Accelerator .. autoclass:: pytorch_lightning.accelerators.tpu_accelerator.TPUAccelerator :noindex: + +------------ + +***************************** +Available ClusterEnvironments +***************************** + +SLURM Environment +================= + +.. autoclass:: pytorch_lightning.cluster_environments.slurm_environment.SLURMEnvironment + :noindex + +LSF Environment +=============== + +.. autoclass:: pytorch_lightning.cluster_environments.lsf_environment.LSFEnvironment + :noindex + +TorchElastic Environment +======================== + +.. autoclass:: pytorch_lightning.cluster_environments.torchelastic_environment.TorchElasticEnvironment + :noindex diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index 77f30219ba8c0..176c1d27dddab 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -148,14 +148,13 @@ def setup_optimizers(self, model): self.trainer.optimizer_frequencies = optimizer_frequencies def init_ddp_connection( - self, global_rank: int, world_size: int, is_slurm_managing_tasks: bool = True + self, global_rank: int, world_size: int ) -> None: self.ddp_plugin.init_ddp_connection( self.trainer, self.cluster_environment, global_rank, - world_size, - is_slurm_managing_tasks, + world_size ) def sync_tensor(self, diff --git a/pytorch_lightning/accelerators/accelerator_connector.py b/pytorch_lightning/accelerators/accelerator_connector.py index 4d899da2b0ec2..01e82203bd0ae 100644 --- a/pytorch_lightning/accelerators/accelerator_connector.py +++ b/pytorch_lightning/accelerators/accelerator_connector.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +import sys import torch @@ -121,16 +122,6 @@ def on_trainer_init( self.trainer.world_size = 1 self.trainer.interactive_ddp_procs = [] - # link up SLURM - # TODO: this should be taken out of here... but depends too much on DDP - self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes) - self.trainer.node_rank = self.determine_ddp_node_rank() - self.trainer.local_rank = self.determine_local_rank() - self.trainer.global_rank = 0 - - # NVIDIA setup - self.set_nvidia_flags(self.trainer.is_slurm_managing_tasks, self.trainer.data_parallel_device_ids) - self.trainer.on_colab_kaggle = os.getenv('COLAB_GPU') or os.getenv('KAGGLE_URL_BASE') self.trainer.replace_sampler_ddp = replace_sampler_ddp @@ -151,12 +142,8 @@ def _map_deprecated_dist_backend(self, accelerator, distributed_backend): return distributed_backend def _select_environment(self): - if self.trainer.plugin_connector.cloud_environment: - env = self.trainer.plugin_connector.cloud_environment - elif self.trainer.is_slurm_managing_tasks: - env = SLURMEnvironment() - elif self._is_using_torchelastic(): - env = TorchElasticEnvironment() + if self.trainer.plugin_connector.cluster_environment: + env = self.trainer.plugin_connector.cluster_environment else: env = TorchElasticEnvironment() return env @@ -182,84 +169,14 @@ def select_accelerator(self): # ---------------------------------- # choose an accelerator for the user # ---------------------------------- - use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks - - # torchelastic or general non_slurm ddp - te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ) - use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed - - use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn" - use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu" - - use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic() - use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks - - # ddp script mode uses the same flags as TE - # TODO: decouple from TE - if os.environ.get('PL_IN_DDP_SUBPROCESS', False): - use_torchelastic_ddp = False - cluster_env = self._select_environment() - # choose the appropriate accelerator backend if self.trainer.use_ddp2: accelerator_backend = accelerators.DDP2Accelerator( self.trainer, cluster_env, self.trainer.plugin_connector.ddp_plugin ) - - elif use_ddp_cpu_slurm: - accelerator_backend = accelerators.DDPCPUHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_slurm_ddp: - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_cpu_torch_elastic: - accelerator_backend = accelerators.DDPCPUHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_torchelastic_ddp: - accelerator_backend = accelerators.DDPHPCAccelerator( - self.trainer, - cluster_env, - self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_spawn: - accelerator_backend = accelerators.DDPSpawnAccelerator( - self.trainer, - nprocs=self.trainer.num_processes, - cluster_environment=cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif use_ddp_cpu_spawn: - accelerator_backend = accelerators.DDPCPUSpawnAccelerator( - self.trainer, - nprocs=self.trainer.num_processes, - cluster_environment=cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - - elif self.trainer.distributed_backend == "ddp": - accelerator_backend = accelerators.DDPAccelerator( - self.trainer, - cluster_env, - ddp_plugin=self.trainer.plugin_connector.ddp_plugin - ) - elif self.trainer.use_dp: accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env) @@ -274,6 +191,27 @@ def select_accelerator(self): elif self.trainer.distributed_backend is None: accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env) + + elif self.trainer.use_ddp: + spawn = self.trainer.distributed_backend == "ddp_spawn" + use_cpu = self.trainer.gpus is None + + acc_args = [self.trainer] + acc_kwargs = {'cluster_environment': cluster_env, 'ddp_plugin': self.trainer.plugin_connector.ddp_plugin} + ddp_cls = None + if use_cpu: + if spawn: + ddp_cls = accelerators.DDPCPUSpawnAccelerator + ddp_kwargs['nprocs'] = self.trainer.num_processes + else: + ddp_cls = accelerators.DDPCPUHPCAccelerator + else: + if spawn: + ddp_cls = accelerators.DDPSpawnAccelerator + ddp_kwargs['nprocs'] = self.trainer.num_processes + else: + ddp_cls = accelerators.DDPHPCAccelerator + accelerator_backend = ddp_cls(*acc_args, **acc_kwargs) else: raise MisconfigurationException( f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend' @@ -382,7 +320,7 @@ def has_horovodrun(): """Returns True if running with `horovodrun` using Gloo or OpenMPI.""" return 'OMPI_COMM_WORLD_RANK' in os.environ or 'HOROVOD_RANK' in os.environ - def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): + def set_nvidia_flags(self, data_parallel_device_ids): if data_parallel_device_ids is None: return @@ -390,26 +328,3 @@ def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids): os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) - log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') - - def determine_local_rank(self): - if self.trainer.is_slurm_managing_tasks: - return int(os.environ['SLURM_LOCALID']) - return int(os.environ.get('LOCAL_RANK', 0)) - - def determine_ddp_node_rank(self): - if self.trainer.is_slurm_managing_tasks: - return int(os.environ['SLURM_NODEID']) - - # torchelastic uses the envvar GROUP_RANK, whereas other systems(?) use NODE_RANK. - # otherwise use given node rank or default to node rank 0 - env_vars = ['NODE_RANK', 'GROUP_RANK'] - node_ids = [(k, os.environ.get(k, None)) for k in env_vars] - node_ids = [(k, v) for k, v in node_ids if v is not None] - if len(node_ids) == 0: - return 0 - if len(node_ids) > 1: - log.warning(f"Multiple environment variables ({node_ids}) defined for node rank. Using the first one.") - k, rank = node_ids.pop() - rank_zero_info(f"Using environment variable {k} for node rank ({rank}).") - return int(rank) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 2e864029f8767..a5a752c5080e1 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -150,8 +150,7 @@ def ddp_train(self, process_idx, mp_queue, model): model.trainer = self.trainer self.init_ddp_connection( self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks + self.trainer.world_size ) if isinstance(self.ddp_plugin, RPCPlugin): diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index da9eb2d3ea937..6bc41c67cbfec 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -251,8 +251,7 @@ def ddp_train(self, process_idx, model): model.trainer = self.trainer self.init_ddp_connection( self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks + self.trainer.world_size ) if isinstance(self.ddp_plugin, RPCPlugin): diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index a0545a4604aec..329c5d2064b16 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -42,7 +42,7 @@ def __init__(self, super().__init__(trainer, cluster_environment, ddp_plugin) self.nickname = 'ddp_cpu' - def model_to_device(self, model, process_idx): + def model_to_device(self, model): model.cpu() def get_device_ids(self): diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index 91a6dee484f30..9fc9326b730a8 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -212,7 +212,7 @@ def set_world_ranks(self, process_idx): self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes - def model_to_device(self, model, process_idx): + def model_to_device(self, model): model.cpu() def get_device_ids(self): diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index ec4c087998614..188fb5a68b2c6 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -21,6 +21,7 @@ from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp +from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.distributed.dist import LightningDistributed @@ -34,7 +35,8 @@ from hydra.utils import get_original_cwd, to_absolute_path -class DDPHPCAccelerator(Accelerator): +import sys +class DDPHPCAccelerator(Accelerator, NVIDIAMixin): def __init__(self, trainer, @@ -57,15 +59,20 @@ def __init__(self, def setup(self, model): self.trainer.model = model + # ---------------------------- + # NVIDIA FLAGS + # ---------------------------- + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) self.task_idx = self.cluster_environment.local_rank() + def train(self): model = self.trainer.model self.ddp_train(process_idx=self.task_idx, model=model) def set_world_ranks(self, process_idx): self.trainer.local_rank = process_idx - self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx + self.trainer.global_rank = self.cluster_environment.node_rank() * self.trainer.num_processes + process_idx self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes def init_device(self, process_idx): @@ -134,14 +141,16 @@ def ddp_train(self, process_idx, model): # set warning rank rank_zero_only.rank = self.trainer.global_rank + # Initialize cuda device + self.init_device(process_idx) + # set up server using proc 0's ip address # try to init for 20 times at max in case ports are taken # where to store ip_table model.trainer = self.trainer self.init_ddp_connection( self.trainer.global_rank, - self.trainer.world_size, - self.trainer.is_slurm_managing_tasks + self.trainer.world_size ) if isinstance(self.ddp_plugin, RPCPlugin): diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index a49e17fc0b31d..5c8e0d50d29df 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -22,6 +22,7 @@ from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp +from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.distributed import LightningDistributed @@ -44,7 +45,7 @@ from hydra.utils import get_original_cwd, to_absolute_path -class DDPSpawnAccelerator(Accelerator): +class DDPSpawnAccelerator(Accelerator, NVIDIAMixin): def __init__(self, trainer, @@ -68,6 +69,10 @@ def __init__(self, def setup(self, model): os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', str(find_free_network_port())) + # ---------------------------- + # NVIDIA FLAGS + # ---------------------------- + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) # pass in a state q smp = mp.get_context('spawn') diff --git a/pytorch_lightning/accelerators/gpu_accelerator.py b/pytorch_lightning/accelerators/gpu_accelerator.py index 1310777e0d890..e101b4367f976 100644 --- a/pytorch_lightning/accelerators/gpu_accelerator.py +++ b/pytorch_lightning/accelerators/gpu_accelerator.py @@ -17,12 +17,13 @@ from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp +from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin from pytorch_lightning.cluster_environments import ClusterEnvironment from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.utilities import AMPType -class GPUAccelerator(Accelerator): +class GPUAccelerator(Accelerator, NVIDIAMixin): amp_backend: AMPType def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None): @@ -40,6 +41,10 @@ def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = self.nickname = None def setup(self, model): + # ---------------------------- + # NVIDIA FLAGS + # ---------------------------- + self.set_nvidia_flags(self.trainer.data_parallel_device_ids) # call setup self.trainer.call_setup_hook(model) diff --git a/pytorch_lightning/accelerators/nvidia_mixin.py b/pytorch_lightning/accelerators/nvidia_mixin.py new file mode 100644 index 0000000000000..4530a182833a8 --- /dev/null +++ b/pytorch_lightning/accelerators/nvidia_mixin.py @@ -0,0 +1,30 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License +import os + +import torch + +from pytorch_lightning import _logger as log + +class NVIDIAMixin: + + def set_nvidia_flags(self, data_parallel_device_ids): + if data_parallel_device_ids is None: + return + + # set the correct cuda visible devices (using pci order) + os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" + all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())]) + devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids) + log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]') diff --git a/pytorch_lightning/cluster_environments/__init__.py b/pytorch_lightning/cluster_environments/__init__.py index 5f03ec07fe311..72f3f46e7715a 100644 --- a/pytorch_lightning/cluster_environments/__init__.py +++ b/pytorch_lightning/cluster_environments/__init__.py @@ -13,4 +13,5 @@ # limitations under the License. from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment from pytorch_lightning.cluster_environments.slurm_environment import SLURMEnvironment +from pytorch_lightning.cluster_environments.lsf_environment import LSFEnvironment from pytorch_lightning.cluster_environments.torchelastic_environment import TorchElasticEnvironment diff --git a/pytorch_lightning/cluster_environments/cluster_environment.py b/pytorch_lightning/cluster_environments/cluster_environment.py index 5196e44411082..e82b1c310d2bd 100644 --- a/pytorch_lightning/cluster_environments/cluster_environment.py +++ b/pytorch_lightning/cluster_environments/cluster_environment.py @@ -31,3 +31,9 @@ def world_size(self): def local_rank(self): pass + + def global_rank(self): + pass + + def node_rank(self): + pass diff --git a/pytorch_lightning/cluster_environments/lsf_environment.py b/pytorch_lightning/cluster_environments/lsf_environment.py new file mode 100644 index 0000000000000..e95547fa72813 --- /dev/null +++ b/pytorch_lightning/cluster_environments/lsf_environment.py @@ -0,0 +1,176 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import socket +import warnings +from pytorch_lightning import _logger as log +from pytorch_lightning.cluster_environments.cluster_environment import ClusterEnvironment + + +class LSFEnvironment(ClusterEnvironment): + """An environment for running on clusters managed by the LSF resource manager. + + It is expected that any execution using this ClusterEnvironment was executed + using the Job Step Manager i.e. jsrun. + + This plugin expects the following environment variables: + + LSB_JOBID + The LSF assigned job ID + + LSB_HOSTS + The hosts used in the job. This string is expected to have the format "batch ...." + + JSM_NAMESPACE_LOCAL_RANK + The node local rank for the task. This environment variable is set by jsrun + + JSM_NAMESPACE_SIZE + The world size for the task. This environment variable is set by jsrun + """ + + def __init__(self): + self._master_address = self._get_master_address() + self._master_port = self._get_master_port() + self._local_rank = self._get_local_rank() + self._global_rank = self._get_global_rank() + self._world_size = self._get_world_size() + self._node_rank = self._get_node_rank() + + # set environment variables needed for initializing torch distributed process group + os.environ["MASTER_ADDR"] = str(self._master_address) + log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}") + os.environ["MASTER_PORT"] = str(self._master_port) + log.debug(f"MASTER_PORT: {os.environ['MASTER_PORT']}") + + def _read_hosts(self): + var = "LSB_HOSTS" + hosts = os.environ.get(var) + if not hosts: + raise ValueError("Could not find hosts -- expected in environment variable %s" % var) + hosts = hosts.split() + if len(hosts) < 2: + raise ValueError("Cannot parse hosts from LSB_HOSTS environment variable -- " + "expected format \"batch ...\"") + return hosts + + def _get_master_address(self): + """A helper for getting the master address""" + hosts = self._read_hosts() + return hosts[1] + + def _get_master_port(self): + """A helper for getting the master port + + Use the LSF job ID so all ranks can compute the master port + """ + # check for user-specified master port + port = os.environ.get("MASTER_PORT") + if not port: + var = "LSB_JOBID" + jobid = os.environ.get(var) + if not jobid: + raise ValueError("Could not find job id -- expected in environment variable %s" % var) + else: + port = int(jobid) + # all ports should be in the 10k+ range + port = int(port) % 1000 + 10000 + log.debug("calculated master port") + else: + log.debug("using externally specified master port") + return port + + def _get_global_rank(self): + """A helper function for getting the global rank + + Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + var = "JSM_NAMESPACE_RANK" + global_rank = os.environ.get(var) + if global_rank is None: + raise ValueError("Cannot determine global rank -- expected in %s " + "-- make sure you run your executable with jsrun" % var) + return int(global_rank) + + def _get_local_rank(self): + """A helper function for getting the local rank + + Read this from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + var = "JSM_NAMESPACE_LOCAL_RANK" + local_rank = os.environ.get(var) + if local_rank is None: + raise ValueError("Cannot determine local rank -- expected in %s " + "-- make sure you run your executable with jsrun" % var) + return int(local_rank) + + def _get_world_size(self): + """A helper function for getting the world size + + Read this from the environment variable JSM_NAMESPACE_SIZE + """ + var = "JSM_NAMESPACE_SIZE" + world_size = os.environ.get(var) + if world_size is None: + raise ValueError("Cannot determine local rank -- expected in %s " + "-- make sure you run your executable with jsrun" % var) + return int(world_size) + + def _get_node_rank(self): + hosts = self._read_hosts() + count = dict() + for host in hosts: + if 'batch' in host or 'login' in host: + continue + if host not in count: + count[host] = len(count) + return count[socket.gethostname()] + + def master_address(self): + """ + Master address is read from a list of hosts contained in the environment variable *LSB_HOSTS* + """ + return self._master_address + + def master_port(self): + """ + Master port is calculated from the LSF job ID + """ + return self._master_port + + def world_size(self): + """ + World size is read from the environment variable JSM_NAMESPACE_SIZE + """ + return self._world_size + + def local_rank(self): + """ + World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + return self._local_rank + + def node_rank(self): + """ + Node rank is determined by the position of the current hostname in the list of hosts stored in LSB_HOSTS + """ + return self._node_rank + + def global_rank(self): + """ + World size is read from the environment variable JSM_NAMESPACE_LOCAL_RANK + """ + return self._global_rank + diff --git a/pytorch_lightning/cluster_environments/slurm_environment.py b/pytorch_lightning/cluster_environments/slurm_environment.py index 6df1cf680c57f..729190965b46b 100644 --- a/pytorch_lightning/cluster_environments/slurm_environment.py +++ b/pytorch_lightning/cluster_environments/slurm_environment.py @@ -20,9 +20,6 @@ class SLURMEnvironment(ClusterEnvironment): - def __init__(self): - super().__init__() - def master_address(self): # figure out the root node addr try: @@ -46,7 +43,7 @@ def master_port(self): default_port = default_port[-4:] # all ports should be in the 10k+ range - default_port = int(default_port) + 15000 + default_port = int(default_port) + 10000 except Exception: default_port = 12910 @@ -81,3 +78,9 @@ def _resolve_root_node_address(self, root_node): root_node = name + number return root_node + + def node_rank(self): + return int(os.environ['SLURM_NODEID']) + + def global_rank(self): + return int(os.environ['SLURM_PROCID']) diff --git a/pytorch_lightning/cluster_environments/torchelastic_environment.py b/pytorch_lightning/cluster_environments/torchelastic_environment.py index a4d769518d252..2fcd0b32f7aa1 100644 --- a/pytorch_lightning/cluster_environments/torchelastic_environment.py +++ b/pytorch_lightning/cluster_environments/torchelastic_environment.py @@ -45,7 +45,13 @@ def master_port(self): return port def world_size(self): - return os.environ.get('WORLD_SIZE') + return int(os.environ.get('WORLD_SIZE')) def local_rank(self): - return int(os.environ['LOCAL_RANK']) + return int(os.environ.get('LOCAL_RANK')) + + def node_rank(self): + return int(os.environ.get('GROUP_RANK')) + + def global_rank(self): + return int(os.environ['RANK']) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 281074cb37813..7e40d65b912f1 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -1,3 +1,4 @@ +import sys import os from contextlib import contextmanager from typing import Any, Dict, List, Optional, Union @@ -75,12 +76,12 @@ def init_ddp_connection( trainer, cluster_environment, global_rank: int, - world_size: int, - is_slurm_managing_tasks: bool = True, + world_size: int ) -> None: os.environ["MASTER_ADDR"] = str(cluster_environment.master_address()) os.environ["MASTER_PORT"] = str(cluster_environment.master_port()) os.environ["WORLD_SIZE"] = str(cluster_environment.world_size()) + torch_backend = "nccl" if trainer.on_gpu else "gloo" if not torch_distrib.is_initialized(): diff --git a/pytorch_lightning/plugins/ddp_sequential_plugin.py b/pytorch_lightning/plugins/ddp_sequential_plugin.py index 010f0ea1648a8..c8c0d020e940f 100644 --- a/pytorch_lightning/plugins/ddp_sequential_plugin.py +++ b/pytorch_lightning/plugins/ddp_sequential_plugin.py @@ -109,7 +109,6 @@ def init_ddp_connection( cluster_environment, global_rank: int, world_size: int, - is_slurm_managing_tasks: bool = True, ) -> None: trainer.prepared_for_backwards = False self._check_arguments(trainer) @@ -119,8 +118,7 @@ def init_ddp_connection( trainer=trainer, cluster_environment=cluster_environment, global_rank=global_rank, - world_size=world_size, - is_slurm_managing_tasks=is_slurm_managing_tasks + world_size=world_size ) super().init_rpc_connection( global_rank=global_rank, diff --git a/pytorch_lightning/plugins/plugin_connector.py b/pytorch_lightning/plugins/plugin_connector.py index d66c25173cc77..ba3ed222eaadf 100644 --- a/pytorch_lightning/plugins/plugin_connector.py +++ b/pytorch_lightning/plugins/plugin_connector.py @@ -30,7 +30,7 @@ def __init__(self, trainer): self.trainer = trainer self.plugins = [] self.ddp_plugin = DDPPlugin() - self.cloud_environment = None + self.cluster_environment = None self.amp_plugin = NativeAMPPlugin(trainer) self.apex_plugin = ApexPlugin(trainer) @@ -99,7 +99,7 @@ def __attach_cluster(self, limit=1): raise MisconfigurationException(m) # set the cluster - self.cloud_environment = plugin + self.cluster_environment = plugin def _convert_str_custom_plugins(self, plugins: Union[str, list]): """ diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 35da90625adef..39a5129a1077b 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -434,6 +434,15 @@ def fit( # bookkeeping self._state = TrainerState.RUNNING + # ---------------------------- + # SET UP TRAINING + # ---------------------------- + self.accelerator_backend = self.accelerator_connector.select_accelerator() + self.local_rank = self.accelerator_backend.cluster_environment.local_rank() + self.node_rank = self.accelerator_backend.cluster_environment.node_rank() + self.global_rank = self.accelerator_backend.cluster_environment.global_rank() + self.accelerator_backend.setup(model) + # ---------------------------- # LINK DATA # ---------------------------- @@ -447,12 +456,6 @@ def fit( # we reuse fit in .test() but change its behavior using this flag self.testing = os.environ.get('PL_TESTING_MODE', self.testing) - # ---------------------------- - # SET UP TRAINING - # ---------------------------- - self.accelerator_backend = self.accelerator_connector.select_accelerator() - self.accelerator_backend.setup(model) - # ---------------------------- # INSPECT THESE FOR MAIN LOOPS # ----------------------------