Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

### Deprecated

-
- Deprecated `ClusterEnvironment.master_{address,port}` in favor of `ClusterEnvironment.main_{address,port}` ([#10103](https://github.com/PyTorchLightning/pytorch-lightning/issues/10103))


-
Expand Down
8 changes: 4 additions & 4 deletions docs/source/clouds/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Once the script is setup like described in :ref:`training_script_setup`, you can

Like a custom cluster, you have to ensure that there is network connectivity between the nodes with firewall rules that allow traffic flow on a specified *MASTER_PORT*.

Finally, you'll need to decide which node you'd like to be the master node (*MASTER_ADDR*), and the ranks of each node (*NODE_RANK*).
Finally, you'll need to decide which node you'd like to be the main node (*MASTER_ADDR*), and the ranks of each node (*NODE_RANK*).

For example:

Expand Down Expand Up @@ -248,7 +248,7 @@ See also the multi-node examples
# NCCL is how the nodes talk to each other
cluster.add_command("export NCCL_DEBUG=INFO")

# setting a master port here is a good idea.
# setting a main port here is a good idea.
cluster.add_command("export MASTER_PORT=%r" % PORT)

# ************** DON'T FORGET THIS ***************
Expand Down Expand Up @@ -307,10 +307,10 @@ and node rank (node id). Here is an example of a custom
def node_rank(self) -> int:
return int(os.environ["NODE_RANK"])

def master_address(self) -> str:
def main_address(self) -> str:
return os.environ["MASTER_ADDRESS"]

def master_port(self) -> int:
def main_port(self) -> int:
return int(os.environ["MASTER_PORT"])


Expand Down
8 changes: 4 additions & 4 deletions docs/source/common/trainer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1116,11 +1116,11 @@ To define your own behavior, subclass the relevant class and pass it in. Here's


class MyCluster(ClusterEnvironment):
def master_address(self):
return your_master_address
def main_address(self):
return your_main_address

def master_port(self):
return your_master_port
def main_port(self):
return your_main_port

def world_size(self):
return the_world_size
Expand Down
2 changes: 1 addition & 1 deletion docs/source/guides/speed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Prefer DDP over DP

1. Copy model to device.
2. Copy data to device.
3. Copy outputs of each device back to master.
3. Copy outputs of each device back to main device.

Whereas :class:`~pytorch_lightning.plugins.training_type.DDPPlugin` only performs 1 transfer to sync gradients, making DDP MUCH faster than DP.

Expand Down
2 changes: 1 addition & 1 deletion pytorch_lightning/accelerators/accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def results(self) -> Any:
This property is deprecated in v1.5 and will be removed in v1.6.
Please call `training_type_plugin.results` directly.

In distributed training, we make sure to transfer the results to the appropriate master process.
In distributed training, we make sure to transfer the results to the appropriate main process.
"""
rank_zero_deprecation(
"`Accelerator.results` is deprecated in v1.5 and will be removed in v1.6. "
Expand Down
31 changes: 27 additions & 4 deletions pytorch_lightning/plugins/environments/cluster_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
from typing import Any, Type

from pytorch_lightning.utilities import rank_zero_deprecation


class ClusterEnvironment(ABC):
"""Specification of a cluster environment."""

def __new__(cls, *args: Any, **kwargs: Any) -> "ClusterEnvironment":
# TODO: remove in 1.7
_check_for_deprecated_methods(cls)
return super().__new__(cls, *args, **kwargs)

@property
@abstractmethod
def creates_processes_externally(self) -> bool:
"""Whether the environment creates the subprocesses or not."""

@property
@abstractmethod
def master_address(self) -> str:
"""The master address through which all processes connect and communicate."""
def main_address(self) -> str:
"""The main address through which all processes connect and communicate."""

@property
@abstractmethod
def master_port(self) -> int:
"""An open and configured port in the master node through which all processes communicate."""
def main_port(self) -> int:
"""An open and configured port in the main node through which all processes communicate."""

@abstractmethod
def world_size(self) -> int:
Expand Down Expand Up @@ -57,3 +67,16 @@ def node_rank(self) -> int:
def teardown(self) -> None:
"""Clean up any state set after execution finishes."""
pass


def _check_for_deprecated_methods(cls: Type[ClusterEnvironment]) -> None:
if hasattr(cls, "master_address") and callable(cls.master_address):
rank_zero_deprecation(
f"`{cls.__name__}.master_address` has been deprecated in v1.6 and will be removed in 1.7."
" Implement the property `main_address` instead (do not forget to add the `@property` decorator)."
)
if hasattr(cls, "master_port") and callable(cls.master_port):
rank_zero_deprecation(
f"`{cls.__name__}.master_port` has been deprecated in v1.6 and will be removed in 1.7."
" Implement the property `main_port` instead (do not forget to add the `@property` decorator)."
)
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ def is_using_kubeflow() -> bool:
def creates_processes_externally(self) -> bool:
return True

def master_address(self) -> str:
@property
def main_address(self) -> str:
return os.environ["MASTER_ADDR"]

def master_port(self) -> int:
@property
def main_port(self) -> int:
return int(os.environ["MASTER_PORT"])

def world_size(self) -> int:
Expand Down
18 changes: 10 additions & 8 deletions pytorch_lightning/plugins/environments/lightning_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ class LightningEnvironment(ClusterEnvironment):
2. The user launches all processes manually or with utilities like :code:`torch.distributed.launch`.
The appropriate environment variables need to be set, and at minimum :code:`LOCAL_RANK`.

If the master address and port are not provided, the default environment will choose them
If the main address and port are not provided, the default environment will choose them
automatically. It is recommended to use this default environment for single-node distributed
training as it provides a convenient way to launch the training script.
"""

def __init__(self):
super().__init__()
self._master_port = None
self._main_port = None
self._global_rank: int = 0
self._world_size: int = 1

Expand All @@ -49,13 +49,15 @@ def creates_processes_externally(self) -> bool:
"""
return "LOCAL_RANK" in os.environ

def master_address(self) -> str:
@property
def main_address(self) -> str:
return os.environ.get("MASTER_ADDR", "127.0.0.1")

def master_port(self) -> int:
if self._master_port is None:
self._master_port = os.environ.get("MASTER_PORT", find_free_network_port())
return int(self._master_port)
@property
def main_port(self) -> int:
if self._main_port is None:
self._main_port = os.environ.get("MASTER_PORT", find_free_network_port())
return int(self._main_port)

def world_size(self) -> int:
return self._world_size
Expand Down Expand Up @@ -85,7 +87,7 @@ def teardown(self) -> None:
def find_free_network_port() -> int:
"""Finds a free port on localhost.

It is useful in single-node training when we don't want to connect to a real master node but have to set the
It is useful in single-node training when we don't want to connect to a real main node but have to set the
`MASTER_PORT` environment variable.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Expand Down
36 changes: 19 additions & 17 deletions pytorch_lightning/plugins/environments/lsf_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class LSFEnvironment(ClusterEnvironment):
"""

def __init__(self):
self._master_address = self._get_master_address()
self._master_port = self._get_master_port()
log.debug(f"MASTER_ADDR: {self._master_address}")
log.debug(f"MASTER_PORT: {self._master_port}")
self._main_address = self._get_main_address()
self._main_port = self._get_main_port()
log.debug(f"MASTER_ADDR: {self._main_address}")
log.debug(f"MASTER_PORT: {self._main_port}")

@staticmethod
def is_using_lsf() -> bool:
Expand All @@ -56,13 +56,15 @@ def is_using_lsf() -> bool:
def creates_processes_externally(self) -> bool:
return True

def master_address(self):
"""The master address is read from a list of hosts contained in the environment variable `LSB_HOSTS`."""
return self._master_address
@property
def main_address(self) -> str:
"""The main address is read from a list of hosts contained in the environment variable `LSB_HOSTS`."""
return self._main_address

def master_port(self):
"""THe master port gets calculated from the LSF job ID."""
return self._master_port
@property
def main_port(self) -> int:
"""The main port gets calculated from the LSF job ID."""
return self._main_port

def world_size(self):
"""The world size is read from the environment variable `JSM_NAMESPACE_SIZE`."""
Expand Down Expand Up @@ -127,17 +129,17 @@ def _read_hosts():
)
return hosts

def _get_master_address(self):
def _get_main_address(self) -> str:
hosts = self._read_hosts()
return hosts[1]

@staticmethod
def _get_master_port():
"""A helper function for accessing the master port.
def _get_main_port() -> int:
"""A helper function for accessing the main port.

Uses the LSF job ID so all ranks can compute the master port.
Uses the LSF job ID so all ranks can compute the main port.
"""
# check for user-specified master port
# check for user-specified main port
port = os.environ.get("MASTER_PORT")
if not port:
jobid = os.environ.get("LSB_JOBID")
Expand All @@ -146,7 +148,7 @@ def _get_master_port():
port = int(jobid)
# all ports should be in the 10k+ range
port = int(port) % 1000 + 10000
log.debug(f"calculated LSF master port: {port}")
log.debug(f"calculated LSF main port: {port}")
else:
log.debug(f"using externally specified master port: {port}")
log.debug(f"using externally specified main port: {port}")
return int(port)
6 changes: 4 additions & 2 deletions pytorch_lightning/plugins/environments/slurm_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class SLURMEnvironment(ClusterEnvironment):
def creates_processes_externally(self) -> bool:
return True

def master_address(self) -> str:
@property
def main_address(self) -> str:
# figure out the root node addr
slurm_nodelist = os.environ.get("SLURM_NODELIST")
if slurm_nodelist:
Expand All @@ -41,7 +42,8 @@ def master_address(self) -> str:
log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
return root_node

def master_port(self) -> int:
@property
def main_port(self) -> int:
# -----------------------
# SLURM JOB = PORT number
# -----------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ def is_using_torchelastic() -> bool:
def creates_processes_externally(self) -> bool:
return True

def master_address(self) -> str:
@property
def main_address(self) -> str:
if "MASTER_ADDR" not in os.environ:
rank_zero_warn("MASTER_ADDR environment variable is not defined. Set as localhost")
os.environ["MASTER_ADDR"] = "127.0.0.1"
log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
master_address = os.environ.get("MASTER_ADDR")
return master_address
main_address = os.environ.get("MASTER_ADDR")
return main_address

def master_port(self) -> int:
@property
def main_port(self) -> int:
if "MASTER_PORT" not in os.environ:
rank_zero_warn("MASTER_PORT environment variable is not defined. Set as 12910")
os.environ["MASTER_PORT"] = "12910"
Expand Down
6 changes: 3 additions & 3 deletions pytorch_lightning/plugins/training_type/ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
class DDPPlugin(ParallelPlugin):
"""Plugin for multi-process single-device training on one or multiple nodes.

The master process in each node spawns N-1 child processes via :func:`subprocess.Popen`, where N is the number of
The main process in each node spawns N-1 child processes via :func:`subprocess.Popen`, where N is the number of
devices (e.g. GPU) per node. It is very similar to how :mod:`torch.distributed.launch` launches processes.
"""

Expand Down Expand Up @@ -188,8 +188,8 @@ def _call_children_scripts(self):
self._check_can_spawn_children()

# DDP Environment variables
os.environ["MASTER_ADDR"] = self.cluster_environment.master_address()
os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port())
os.environ["MASTER_ADDR"] = self.cluster_environment.main_address
os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port)

# allow the user to pass the node rank
os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank())
Expand Down
4 changes: 2 additions & 2 deletions pytorch_lightning/plugins/training_type/ddp_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _is_single_process_single_device(self):
return True

def setup(self) -> None:
os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port())
os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port)
# pass in a state q
smp = mp.get_context("spawn")
self.mp_queue = smp.SimpleQueue()
Expand Down Expand Up @@ -178,7 +178,7 @@ def spawn(self, function: Callable, *args: Any, return_result: bool = True, **kw
Return:
The output of the function of process 0.
"""
os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port())
os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port)
context = mp.get_context("spawn")
return_queue = context.SimpleQueue() if return_result else None
mp.spawn(self._wrapped_function, args=(function, args, kwargs, return_queue), nprocs=self.num_processes)
Expand Down
8 changes: 3 additions & 5 deletions pytorch_lightning/plugins/training_type/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,11 @@ def _init_deepspeed_distributed(self) -> None:
f"GLOBAL_RANK: {self.global_rank}, "
f"MEMBER: {self.global_rank + 1}/{self.world_size}"
)
deepspeed.init_distributed(
self.torch_distributed_backend, distributed_port=self.cluster_environment.master_port()
)
deepspeed.init_distributed(self.torch_distributed_backend, distributed_port=self.cluster_environment.main_port)

def _set_node_environment_variables(self) -> None:
os.environ["MASTER_ADDR"] = self.cluster_environment.master_address()
os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port())
os.environ["MASTER_ADDR"] = self.cluster_environment.main_address
os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port)
os.environ["RANK"] = str(self.global_rank)
os.environ["WORLD_SIZE"] = str(self.world_size)
os.environ["LOCAL_RANK"] = str(self.local_rank)
Expand Down
2 changes: 1 addition & 1 deletion pytorch_lightning/plugins/training_type/tpu_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ def test_step_end(self, output: STEP_OUTPUT) -> STEP_OUTPUT:

def _pod_progress_bar_force_stdout(self) -> None:
# Why is it required? The way `pytorch_xla.distributed` streams logs
# from different vms to the master worker doesn't work well with tqdm
# from different vms to the main worker doesn't work well with tqdm
# Ref: https://github.com/pytorch/xla/blob/master/torch_xla/distributed/xla_dist.py#L140
# The print statement seems to force tqdm to flush stdout.
if self.tpu_global_core_rank == 0 and int(os.getenv(xenv.TPUVM_MODE, 0)) == 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def results(self) -> Optional[Union[_EVALUATE_OUTPUT, _PREDICT_OUTPUT]]:
The result is
cached instead of returned directly, because some plugins require transmitting the results from one
multiprocessing context to another in a separate step. For example, the plugins that use the "spawn"
start-method send the result to the master process through a
start-method send the result to the main process through a
`multiprocessing queue (shared memory) <https://pytorch.org/docs/stable/multiprocessing.html>`_.
"""
return self._results
Expand Down
6 changes: 3 additions & 3 deletions pytorch_lightning/utilities/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def sync_ddp_if_available(
def sync_ddp(
result: torch.Tensor, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = None
) -> torch.Tensor:
"""Function to reduce the tensors from several ddp processes to one master process.
"""Function to reduce the tensors from several ddp processes to one main process.

Args:
result: the value to sync and reduce (typically tensor or number)
Expand Down Expand Up @@ -379,8 +379,8 @@ def init_dist_connection(
"""
global_rank = global_rank if global_rank is not None else cluster_environment.global_rank()
world_size = world_size if world_size is not None else cluster_environment.world_size()
os.environ["MASTER_ADDR"] = cluster_environment.master_address()
os.environ["MASTER_PORT"] = str(cluster_environment.master_port())
os.environ["MASTER_ADDR"] = cluster_environment.main_address
os.environ["MASTER_PORT"] = str(cluster_environment.main_port)
if torch.distributed.is_available() and not torch.distributed.is_initialized():
log.info(f"initializing distributed: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}")
torch.distributed.init_process_group(
Expand Down
Loading