Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/source/clouds/cluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,10 @@ and node rank (node id). Here is an example of a custom
def node_rank(self) -> int:
return int(os.environ["NODE_RANK"])

def master_address(self) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a warning in the documentation than master_{} is depreceated for main and will be removed for 1.7.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done this

def main_address(self) -> str:
return os.environ["MASTER_ADDRESS"]

def master_port(self) -> int:
def main_port(self) -> int:
return int(os.environ["MASTER_PORT"])


Expand Down
4 changes: 2 additions & 2 deletions docs/source/common/trainer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1115,10 +1115,10 @@ To define your own behavior, subclass the relevant class and pass it in. Here's


class MyCluster(ClusterEnvironment):
def master_address(self):
def main_address(self):
return your_master_address

def master_port(self):
def main_port(self):
return your_master_port

def world_size(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ def creates_children(self) -> bool:
"""Whether the environment creates the subprocesses or not."""

@abstractmethod
def master_address(self) -> str:
def main_address(self) -> str:
"""The master address through which all processes connect and communicate."""

@abstractmethod
def master_port(self) -> int:
def main_port(self) -> int:
"""An open and configured port in the master node through which all processes communicate."""

@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ def is_using_kubeflow() -> bool:
def creates_children(self) -> bool:
return True

def master_address(self) -> str:
def main_address(self) -> str:
return os.environ["MASTER_ADDR"]

def master_port(self) -> int:
def main_port(self) -> int:
return int(os.environ["MASTER_PORT"])

def world_size(self) -> int:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ def creates_children(self) -> bool:
"""
return "LOCAL_RANK" in os.environ

def master_address(self) -> str:
def main_address(self) -> str:
return os.environ.get("MASTER_ADDR", "127.0.0.1")

def master_port(self) -> int:
def main_port(self) -> int:
if self._master_port is None:
self._master_port = os.environ.get("MASTER_PORT", find_free_network_port())
return int(self._master_port)
Expand Down
4 changes: 2 additions & 2 deletions pytorch_lightning/plugins/environments/lsf_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ def is_using_lsf() -> bool:
def creates_children(self) -> bool:
return True

def master_address(self):
def main_address(self):
"""The master address is read from a list of hosts contained in the environment variable `LSB_HOSTS`."""
return self._master_address

def master_port(self):
def main_port(self):
"""THe master port gets calculated from the LSF job ID."""
return self._master_port

Expand Down
4 changes: 2 additions & 2 deletions pytorch_lightning/plugins/environments/slurm_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class SLURMEnvironment(ClusterEnvironment):
def creates_children(self) -> bool:
return True

def master_address(self) -> str:
def main_address(self) -> str:
# figure out the root node addr
slurm_nodelist = os.environ.get("SLURM_NODELIST")
if slurm_nodelist:
Expand All @@ -40,7 +40,7 @@ def master_address(self) -> str:
log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
return root_node

def master_port(self) -> int:
def main_port(self) -> int:
# -----------------------
# SLURM JOB = PORT number
# -----------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ def is_using_torchelastic() -> bool:
def creates_children(self) -> bool:
return True

def master_address(self) -> str:
def main_address(self) -> str:
if "MASTER_ADDR" not in os.environ:
rank_zero_warn("MASTER_ADDR environment variable is not defined. Set as localhost")
os.environ["MASTER_ADDR"] = "127.0.0.1"
log.debug(f"MASTER_ADDR: {os.environ['MASTER_ADDR']}")
master_address = os.environ.get("MASTER_ADDR")
return master_address

def master_port(self) -> int:
def main_port(self) -> int:
if "MASTER_PORT" not in os.environ:
rank_zero_warn("MASTER_PORT environment variable is not defined. Set as 12910")
os.environ["MASTER_PORT"] = "12910"
Expand Down
4 changes: 2 additions & 2 deletions pytorch_lightning/plugins/training_type/ddp.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ def _call_children_scripts(self):
self._check_can_spawn_children()

# DDP Environment variables
os.environ["MASTER_ADDR"] = self.cluster_environment.master_address()
os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port())
os.environ["MASTER_ADDR"] = self.cluster_environment.main_address()
os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port())

# allow the user to pass the node rank
os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank())
Expand Down
6 changes: 3 additions & 3 deletions pytorch_lightning/plugins/training_type/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,14 @@ def init_ddp_connection(self, global_rank: Optional[int] = None, world_size: Opt
f"MEMBER: {global_rank + 1}/{world_size}"
)
deepspeed.init_distributed(
self.torch_distributed_backend, distributed_port=self.cluster_environment.master_port()
self.torch_distributed_backend, distributed_port=self.cluster_environment.main_port()
)

def _set_node_environment_variables(
self, global_rank: Optional[int] = None, world_size: Optional[int] = None
) -> None:
os.environ["MASTER_ADDR"] = self.cluster_environment.master_address()
os.environ["MASTER_PORT"] = str(self.cluster_environment.master_port())
os.environ["MASTER_ADDR"] = self.cluster_environment.main_address()
os.environ["MASTER_PORT"] = str(self.cluster_environment.main_port())
os.environ["RANK"] = str(global_rank)
os.environ["WORLD_SIZE"] = str(world_size)
os.environ["LOCAL_RANK"] = str(self.local_rank)
Expand Down
4 changes: 2 additions & 2 deletions pytorch_lightning/utilities/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ def init_ddp_connection(
"""
global_rank = global_rank if global_rank is not None else cluster_environment.global_rank()
world_size = world_size if world_size is not None else cluster_environment.world_size()
os.environ["MASTER_ADDR"] = cluster_environment.master_address()
os.environ["MASTER_PORT"] = str(cluster_environment.master_port())
os.environ["MASTER_ADDR"] = cluster_environment.main_address()
os.environ["MASTER_PORT"] = str(cluster_environment.main_port())
if torch.distributed.is_available() and not torch.distributed.is_initialized():
log.info(f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}")
torch.distributed.init_process_group(
Expand Down
2 changes: 1 addition & 1 deletion tests/accelerators/test_accelerator_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ def test_accelerator_choice_ddp_cpu_custom_cluster(_, tmpdir):
"""Test that we choose the custom cluster even when SLURM or TE flags are around"""

class CustomCluster(LightningEnvironment):
def master_address(self):
def main_address(self):
return "asdf"

def creates_children(self) -> bool:
Expand Down
8 changes: 4 additions & 4 deletions tests/plugins/environments/test_kubeflow_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ def test_default_attributes():

with pytest.raises(KeyError):
# MASTER_ADDR is required
env.master_address()
env.main_address()
with pytest.raises(KeyError):
# MASTER_PORT is required
env.master_port()
env.main_port()
with pytest.raises(KeyError):
# WORLD_SIZE is required
env.world_size()
Expand All @@ -54,8 +54,8 @@ def test_default_attributes():
def test_attributes_from_environment_variables(caplog):
"""Test that the torchelastic cluster environment takes the attributes from the environment variables."""
env = KubeflowEnvironment()
assert env.master_address() == "1.2.3.4"
assert env.master_port() == 500
assert env.main_address() == "1.2.3.4"
assert env.main_port() == 500
assert env.world_size() == 20
assert env.global_rank() == 1
assert env.local_rank() == 0
Expand Down
12 changes: 6 additions & 6 deletions tests/plugins/environments/test_lightning_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def test_default_attributes():
"""Test the default attributes when no environment variables are set."""
env = LightningEnvironment()
assert not env.creates_children()
assert env.master_address() == "127.0.0.1"
assert isinstance(env.master_port(), int)
assert env.main_address() == "127.0.0.1"
assert isinstance(env.main_port(), int)
assert env.world_size() == 1
assert env.local_rank() == 0
assert env.node_rank() == 0
Expand All @@ -35,8 +35,8 @@ def test_default_attributes():
def test_attributes_from_environment_variables():
"""Test that the default cluster environment takes the attributes from the environment variables."""
env = LightningEnvironment()
assert env.master_address() == "1.2.3.4"
assert env.master_port() == 500
assert env.main_address() == "1.2.3.4"
assert env.main_port() == 500
assert env.world_size() == 1
assert env.global_rank() == 0
assert env.local_rank() == 2
Expand Down Expand Up @@ -69,10 +69,10 @@ def test_node_rank_from_group_rank():
def test_random_master_port():
"""Test randomly chosen master port when no master port was given by user."""
env = LightningEnvironment()
port = env.master_port()
port = env.main_port()
assert isinstance(port, int)
# repeated calls do not generate a new port number
assert env.master_port() == port
assert env.main_port() == port


@mock.patch.dict(os.environ, {"WORLD_SIZE": "1"})
Expand Down
6 changes: 3 additions & 3 deletions tests/plugins/environments/test_lsf_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_missing_lsb_job_id():
def test_manual_master_port_and_address():
"""Test a user can set the port manually through the MASTER_PORT env variable."""
env = LSFEnvironment()
assert env.master_port() == 4321
assert env.main_port() == 4321


@mock.patch.dict(
Expand All @@ -56,8 +56,8 @@ def test_attributes_from_environment_variables():
"""Test that the LSF environment takes the attributes from the environment variables."""
env = LSFEnvironment()
assert env.creates_children()
assert env.master_address() == "10.10.10.0"
assert env.master_port() == 10234
assert env.main_address() == "10.10.10.0"
assert env.main_port() == 10234
assert env.world_size() == 4
assert env.global_rank() == 3
assert env.local_rank() == 1
Expand Down
10 changes: 5 additions & 5 deletions tests/plugins/environments/test_slurm_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def test_default_attributes():
"""Test the default attributes when no environment variables are set."""
env = SLURMEnvironment()
assert env.creates_children()
assert env.master_address() == "127.0.0.1"
assert env.master_port() == 12910
assert env.main_address() == "127.0.0.1"
assert env.main_port() == 12910
with pytest.raises(KeyError):
# world size is required to be passed as env variable
env.world_size()
Expand All @@ -52,8 +52,8 @@ def test_default_attributes():
def test_attributes_from_environment_variables(caplog):
"""Test that the SLURM cluster environment takes the attributes from the environment variables."""
env = SLURMEnvironment()
assert env.master_address() == "1.1.1.1"
assert env.master_port() == 15000 + 1234
assert env.main_address() == "1.1.1.1"
assert env.main_port() == 15000 + 1234
assert env.world_size() == 20
assert env.global_rank() == 1
assert env.local_rank() == 2
Expand All @@ -80,4 +80,4 @@ def test_master_address_from_slurm_node_list(slurm_node_list, expected):
"""Test extracting the master node from different formats for the SLURM_NODELIST."""
with mock.patch.dict(os.environ, {"SLURM_NODELIST": slurm_node_list}):
env = SLURMEnvironment()
assert env.master_address() == expected
assert env.main_address() == expected
8 changes: 4 additions & 4 deletions tests/plugins/environments/test_torchelastic_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def test_default_attributes():
"""Test the default attributes when no environment variables are set."""
env = TorchElasticEnvironment()
assert env.creates_children()
assert env.master_address() == "127.0.0.1"
assert env.master_port() == 12910
assert env.main_address() == "127.0.0.1"
assert env.main_port() == 12910
assert env.world_size() is None
with pytest.raises(KeyError):
# local rank is required to be passed as env variable
Expand All @@ -48,8 +48,8 @@ def test_default_attributes():
def test_attributes_from_environment_variables(caplog):
"""Test that the torchelastic cluster environment takes the attributes from the environment variables."""
env = TorchElasticEnvironment()
assert env.master_address() == "1.2.3.4"
assert env.master_port() == 500
assert env.main_address() == "1.2.3.4"
assert env.main_port() == 500
assert env.world_size() == 20
assert env.global_rank() == 1
assert env.local_rank() == 2
Expand Down
4 changes: 2 additions & 2 deletions tests/plugins/test_deepspeed_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,8 +790,8 @@ def test_deepspeed_plugin_env_variables(mock_deepspeed_distributed, tmpdir, plat
# assert no env variables have been set within the DeepSpeedPlugin
assert all(k not in os.environ for k in ("MASTER_PORT", "MASTER_ADDR", "RANK", "WORLD_SIZE", "LOCAL_RANK"))
else:
assert os.environ["MASTER_ADDR"] == str(trainer.training_type_plugin.cluster_environment.master_address())
assert os.environ["MASTER_PORT"] == str(trainer.training_type_plugin.cluster_environment.master_port())
assert os.environ["MASTER_ADDR"] == str(trainer.training_type_plugin.cluster_environment.main_address())
assert os.environ["MASTER_PORT"] == str(trainer.training_type_plugin.cluster_environment.main_port())
assert os.environ["RANK"] == str(trainer.training_type_plugin.global_rank)
assert os.environ["WORLD_SIZE"] == str(trainer.training_type_plugin.world_size)
assert os.environ["LOCAL_RANK"] == str(trainer.training_type_plugin.local_rank)
Expand Down