Skip to content
Closed
24 changes: 24 additions & 0 deletions docs/source/accelerators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,27 @@ TPU Accelerator

.. autoclass:: pytorch_lightning.accelerators.tpu_accelerator.TPUAccelerator
:noindex:

------------

*****************************
Available ClusterEnvironments
*****************************

SLURM Environment
=================

.. autoclass:: pytorch_lightning.cluster_environments.slurm_environment.SLURMEnvironment
:noindex

LSF Environment
===============

.. autoclass:: pytorch_lightning.cluster_environments.lsf_environment.LSFEnvironment
:noindex

TorchElastic Environment
========================

.. autoclass:: pytorch_lightning.cluster_environments.torchelastic_environment.TorchElasticEnvironment
:noindex
5 changes: 2 additions & 3 deletions pytorch_lightning/accelerators/accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,13 @@ def setup_optimizers(self, model):
self.trainer.optimizer_frequencies = optimizer_frequencies

def init_ddp_connection(
self, global_rank: int, world_size: int, is_slurm_managing_tasks: bool = True
self, global_rank: int, world_size: int
) -> None:
self.ddp_plugin.init_ddp_connection(
self.trainer,
self.cluster_environment,
global_rank,
world_size,
is_slurm_managing_tasks,
world_size
)

def sync_tensor(self,
Expand Down
135 changes: 25 additions & 110 deletions pytorch_lightning/accelerators/accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys

import torch

Expand Down Expand Up @@ -121,16 +122,6 @@ def on_trainer_init(
self.trainer.world_size = 1
self.trainer.interactive_ddp_procs = []

# link up SLURM
# TODO: this should be taken out of here... but depends too much on DDP
self.trainer.slurm_connector.on_trainer_init(self.trainer.num_nodes)
self.trainer.node_rank = self.determine_ddp_node_rank()
self.trainer.local_rank = self.determine_local_rank()
self.trainer.global_rank = 0

# NVIDIA setup
self.set_nvidia_flags(self.trainer.is_slurm_managing_tasks, self.trainer.data_parallel_device_ids)

self.trainer.on_colab_kaggle = os.getenv('COLAB_GPU') or os.getenv('KAGGLE_URL_BASE')

self.trainer.replace_sampler_ddp = replace_sampler_ddp
Expand All @@ -151,12 +142,8 @@ def _map_deprecated_dist_backend(self, accelerator, distributed_backend):
return distributed_backend

def _select_environment(self):
if self.trainer.plugin_connector.cloud_environment:
env = self.trainer.plugin_connector.cloud_environment
elif self.trainer.is_slurm_managing_tasks:
env = SLURMEnvironment()
elif self._is_using_torchelastic():
env = TorchElasticEnvironment()
if self.trainer.plugin_connector.cluster_environment:
env = self.trainer.plugin_connector.cluster_environment
else:
env = TorchElasticEnvironment()
return env
Expand All @@ -182,84 +169,14 @@ def select_accelerator(self):
# ----------------------------------
# choose an accelerator for the user
# ----------------------------------
use_slurm_ddp = self.trainer.use_ddp and self.trainer.is_slurm_managing_tasks

# torchelastic or general non_slurm ddp
te_flags_passed = 'WORLD_SIZE' in os.environ and ('GROUP_RANK' in os.environ or 'NODE_RANK' in os.environ)
use_torchelastic_ddp = self.trainer.use_ddp and te_flags_passed

use_ddp_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_spawn"
use_ddp_cpu_spawn = self.trainer.use_ddp and self.trainer.distributed_backend == "ddp_cpu"

use_ddp_cpu_torch_elastic = use_ddp_cpu_spawn and self._is_using_torchelastic()
use_ddp_cpu_slurm = use_ddp_cpu_spawn and self.trainer.is_slurm_managing_tasks

# ddp script mode uses the same flags as TE
# TODO: decouple from TE
if os.environ.get('PL_IN_DDP_SUBPROCESS', False):
use_torchelastic_ddp = False

cluster_env = self._select_environment()

# choose the appropriate accelerator backend
if self.trainer.use_ddp2:
accelerator_backend = accelerators.DDP2Accelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_ddp_cpu_slurm:
accelerator_backend = accelerators.DDPCPUHPCAccelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_slurm_ddp:
accelerator_backend = accelerators.DDPHPCAccelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_ddp_cpu_torch_elastic:
accelerator_backend = accelerators.DDPCPUHPCAccelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_torchelastic_ddp:
accelerator_backend = accelerators.DDPHPCAccelerator(
self.trainer,
cluster_env,
self.trainer.plugin_connector.ddp_plugin
)

elif use_ddp_spawn:
accelerator_backend = accelerators.DDPSpawnAccelerator(
self.trainer,
nprocs=self.trainer.num_processes,
cluster_environment=cluster_env,
ddp_plugin=self.trainer.plugin_connector.ddp_plugin
)

elif use_ddp_cpu_spawn:
accelerator_backend = accelerators.DDPCPUSpawnAccelerator(
self.trainer,
nprocs=self.trainer.num_processes,
cluster_environment=cluster_env,
ddp_plugin=self.trainer.plugin_connector.ddp_plugin
)

elif self.trainer.distributed_backend == "ddp":
accelerator_backend = accelerators.DDPAccelerator(
self.trainer,
cluster_env,
ddp_plugin=self.trainer.plugin_connector.ddp_plugin
)

elif self.trainer.use_dp:
accelerator_backend = accelerators.DataParallelAccelerator(self.trainer, cluster_env)

Expand All @@ -274,6 +191,27 @@ def select_accelerator(self):

elif self.trainer.distributed_backend is None:
accelerator_backend = accelerators.CPUAccelerator(self.trainer, cluster_env)

elif self.trainer.use_ddp:
spawn = self.trainer.distributed_backend == "ddp_spawn"
use_cpu = self.trainer.gpus is None

acc_args = [self.trainer]
acc_kwargs = {'cluster_environment': cluster_env, 'ddp_plugin': self.trainer.plugin_connector.ddp_plugin}
ddp_cls = None
if use_cpu:
if spawn:
ddp_cls = accelerators.DDPCPUSpawnAccelerator
ddp_kwargs['nprocs'] = self.trainer.num_processes
else:
ddp_cls = accelerators.DDPCPUHPCAccelerator
else:
if spawn:
ddp_cls = accelerators.DDPSpawnAccelerator
ddp_kwargs['nprocs'] = self.trainer.num_processes
else:
ddp_cls = accelerators.DDPHPCAccelerator
accelerator_backend = ddp_cls(*acc_args, **acc_kwargs)
else:
raise MisconfigurationException(
f'Trainer(accelerator={self.trainer.distributed_backend} is not a supported backend'
Expand Down Expand Up @@ -382,34 +320,11 @@ def has_horovodrun():
"""Returns True if running with `horovodrun` using Gloo or OpenMPI."""
return 'OMPI_COMM_WORLD_RANK' in os.environ or 'HOROVOD_RANK' in os.environ

def set_nvidia_flags(self, is_slurm_managing_tasks, data_parallel_device_ids):
def set_nvidia_flags(self, data_parallel_device_ids):
if data_parallel_device_ids is None:
return

# set the correct cuda visible devices (using pci order)
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
all_gpu_ids = ",".join([str(x) for x in range(torch.cuda.device_count())])
devices = os.environ.get("CUDA_VISIBLE_DEVICES", all_gpu_ids)
log.info(f'LOCAL_RANK: {self.trainer.local_rank} - CUDA_VISIBLE_DEVICES: [{devices}]')

def determine_local_rank(self):
if self.trainer.is_slurm_managing_tasks:
return int(os.environ['SLURM_LOCALID'])
return int(os.environ.get('LOCAL_RANK', 0))

def determine_ddp_node_rank(self):
if self.trainer.is_slurm_managing_tasks:
return int(os.environ['SLURM_NODEID'])

# torchelastic uses the envvar GROUP_RANK, whereas other systems(?) use NODE_RANK.
# otherwise use given node rank or default to node rank 0
env_vars = ['NODE_RANK', 'GROUP_RANK']
node_ids = [(k, os.environ.get(k, None)) for k in env_vars]
node_ids = [(k, v) for k, v in node_ids if v is not None]
if len(node_ids) == 0:
return 0
if len(node_ids) > 1:
log.warning(f"Multiple environment variables ({node_ids}) defined for node rank. Using the first one.")
k, rank = node_ids.pop()
rank_zero_info(f"Using environment variable {k} for node rank ({rank}).")
return int(rank)
3 changes: 1 addition & 2 deletions pytorch_lightning/accelerators/ddp2_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ def ddp_train(self, process_idx, mp_queue, model):
model.trainer = self.trainer
self.init_ddp_connection(
self.trainer.global_rank,
self.trainer.world_size,
self.trainer.is_slurm_managing_tasks
self.trainer.world_size
)

if isinstance(self.ddp_plugin, RPCPlugin):
Expand Down
3 changes: 1 addition & 2 deletions pytorch_lightning/accelerators/ddp_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,7 @@ def ddp_train(self, process_idx, model):
model.trainer = self.trainer
self.init_ddp_connection(
self.trainer.global_rank,
self.trainer.world_size,
self.trainer.is_slurm_managing_tasks
self.trainer.world_size
)

if isinstance(self.ddp_plugin, RPCPlugin):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self,
super().__init__(trainer, cluster_environment, ddp_plugin)
self.nickname = 'ddp_cpu'

def model_to_device(self, model, process_idx):
def model_to_device(self, model):
model.cpu()

def get_device_ids(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ def set_world_ranks(self, process_idx):
self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx
self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes

def model_to_device(self, model, process_idx):
def model_to_device(self, model):
model.cpu()

def get_device_ids(self):
Expand Down
17 changes: 13 additions & 4 deletions pytorch_lightning/accelerators/ddp_hpc_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed.dist import LightningDistributed
Expand All @@ -34,7 +35,8 @@
from hydra.utils import get_original_cwd, to_absolute_path


class DDPHPCAccelerator(Accelerator):
import sys
class DDPHPCAccelerator(Accelerator, NVIDIAMixin):

def __init__(self,
trainer,
Expand All @@ -57,15 +59,20 @@ def __init__(self,

def setup(self, model):
self.trainer.model = model
# ----------------------------
# NVIDIA FLAGS
# ----------------------------
self.set_nvidia_flags(self.trainer.data_parallel_device_ids)
self.task_idx = self.cluster_environment.local_rank()


def train(self):
model = self.trainer.model
self.ddp_train(process_idx=self.task_idx, model=model)

def set_world_ranks(self, process_idx):
self.trainer.local_rank = process_idx
self.trainer.global_rank = self.trainer.node_rank * self.trainer.num_processes + process_idx
self.trainer.global_rank = self.cluster_environment.node_rank() * self.trainer.num_processes + process_idx
self.trainer.world_size = self.trainer.num_nodes * self.trainer.num_processes

def init_device(self, process_idx):
Expand Down Expand Up @@ -134,14 +141,16 @@ def ddp_train(self, process_idx, model):
# set warning rank
rank_zero_only.rank = self.trainer.global_rank

# Initialize cuda device
self.init_device(process_idx)

# set up server using proc 0's ip address
# try to init for 20 times at max in case ports are taken
# where to store ip_table
model.trainer = self.trainer
self.init_ddp_connection(
self.trainer.global_rank,
self.trainer.world_size,
self.trainer.is_slurm_managing_tasks
self.trainer.world_size
)

if isinstance(self.ddp_plugin, RPCPlugin):
Expand Down
7 changes: 6 additions & 1 deletion pytorch_lightning/accelerators/ddp_spawn_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.distributed import LightningDistributed
Expand All @@ -44,7 +45,7 @@
from hydra.utils import get_original_cwd, to_absolute_path


class DDPSpawnAccelerator(Accelerator):
class DDPSpawnAccelerator(Accelerator, NVIDIAMixin):

def __init__(self,
trainer,
Expand All @@ -68,6 +69,10 @@ def __init__(self,

def setup(self, model):
os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', str(find_free_network_port()))
# ----------------------------
# NVIDIA FLAGS
# ----------------------------
self.set_nvidia_flags(self.trainer.data_parallel_device_ids)

# pass in a state q
smp = mp.get_context('spawn')
Expand Down
7 changes: 6 additions & 1 deletion pytorch_lightning/accelerators/gpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

from pytorch_lightning import _logger as log
from pytorch_lightning.accelerators.accelerator import Accelerator, ReduceOp
from pytorch_lightning.accelerators.nvidia_mixin import NVIDIAMixin
from pytorch_lightning.cluster_environments import ClusterEnvironment
from pytorch_lightning.distributed.dist import LightningDistributed
from pytorch_lightning.utilities import AMPType


class GPUAccelerator(Accelerator):
class GPUAccelerator(Accelerator, NVIDIAMixin):
amp_backend: AMPType

def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] = None):
Expand All @@ -40,6 +41,10 @@ def __init__(self, trainer, cluster_environment: Optional[ClusterEnvironment] =
self.nickname = None

def setup(self, model):
# ----------------------------
# NVIDIA FLAGS
# ----------------------------
self.set_nvidia_flags(self.trainer.data_parallel_device_ids)

# call setup
self.trainer.call_setup_hook(model)
Expand Down
Loading