Skip to content

Conversation

@alli1999
Copy link

What does this PR do?

Fixes #8774

Does your PR introduce any breaking changes? If yes, please list them.

None

Before submitting

  • Was this discussed/approved via a GitHub issue? (not for typos and docs)
  • Did you read the contributor guideline, Pull Request section?
  • Did you make sure your PR does only one thing, instead of bundling different changes together?
  • Did you make sure to update the documentation with your changes? (if necessary)
  • Did you write any new necessary tests? (not for typos and docs)
  • Did you verify new and existing tests pass locally with your changes?
  • Did you list all the breaking changes introduced by this pull request?
  • Did you update the CHANGELOG? (not for typos, docs, test updates, or internal minor changes/refactorings)

PR review

Anyone in the community is welcome to review the PR.
Before you start reviewing make sure you have read Review guidelines. In short, see the following bullet-list:

  • Is this pull request ready for review? (if not, please submit in draft mode)
  • Check that all items from Before submitting are resolved
  • Make sure the title is self-explanatory and the description concisely explains the PR
  • Add labels and milestones (and optionally projects) to the PR so it can be classified

Did you have fun?

Yes
Make sure you had fun coding 🙃

@awaelchli
Copy link
Contributor

@alli1999 could you check my notes here in this comment?

We want to keep the old methods, but deprecate them.
Here is an example of a deprecated method.

@awaelchli awaelchli marked this pull request as draft August 24, 2021 18:10
@alli1999
Copy link
Author

Okay I think I got it now. So I'll create new methods with the new names and add comments in the old methods stating it is deprecated and to use the new method names. One question is will the new methods be created in the same scripts as the old one's? Another question, the code in all these methods won't change right?

@tchaton
Copy link
Contributor

tchaton commented Aug 25, 2021

Okay I think I got it now. So I'll create new methods with the new names and add comments in the old methods stating it is deprecated and to use the new method names. One question is will the new methods be created in the same scripts as the old one's? Another question, the code in all these methods won't change right?

Dear @alli1999,

Here are some indications to get you started:

from abc import ABC, abstractmethod
from pytorch_lightning.utilities.model_helpers import is_overridden


class ClusterEnvironment(ABC):
    """Specification of a cluster environment."""

    @abstractmethod
    def creates_children(self) -> bool:
        """Whether the environment creates the subprocesses or not."""

    @abstractmethod
    def master_address(self) -> str:
        """The master address through which all processes connect and communicate."""

    @abstractmethod
    def master_port(self) -> int:
        """An open and configured port in the master node through which all processes communicate."""

      #### ADDED NEW HOOKS

    @abstractmethod
    def main_address(self) -> str:
        """The master address through which all processes connect and communicate."""
    

    @abstractmethod
    def main_port(self) -> int:
        """An open and configured port in the master node through which all processes communicate."""

     #### ADDED NEW HOOKS

    @abstractmethod
    def world_size(self) -> int:
        """The number of processes across all devices and nodes."""

    @abstractmethod
    def set_world_size(self, size: int) -> None:
        pass

    @abstractmethod
    def global_rank(self) -> int:
        """The rank (index) of the currently running process across all nodes and devices."""

    @abstractmethod
    def set_global_rank(self, rank: int) -> None:
        pass

    @abstractmethod
    def local_rank(self) -> int:
        """The rank (index) of the currently running process inside of the current node."""

    @abstractmethod
    def node_rank(self) -> int:
        """The rank (index) of the node on which the current process runs."""

    def teardown(self) -> None:
        """Clean up any state set after execution finishes."""
        pass

For all ClusterEnvironment in Lightning, we can already to the change as it is internal to the library.

class LightningEnvironment(ClusterEnvironment):

    def __init__(self):
        super().__init__()
        self._master_port = None
        self._global_rank: int = 0
        self._world_size: int = 1

    def creates_children(self) -> bool:
        """
        Returns whether the cluster creates the processes or not.
        If at least :code:`LOCAL_RANK` is available as environment variable, Lightning assumes the user acts as the
        process launcher/job scheduler and Lightning will not launch new processes.
        """
        return "LOCAL_RANK" in os.environ

    def main_address(self) -> str:
        return os.environ.get("MASTER_ADDR", "127.0.0.1")

    def master_port(self) -> int:
        if self._master_port is None:
            self._master_port = os.environ.get("MASTER_PORT", find_free_network_port())
        return int(self._master_port)

for external users with their own ClusterEnvironment still using master instead of main.
user code

class ExternalClusterEnvironment(ClusterEnvironment):

    def master_address(self) -> str:
        return os.environ.get("MASTER_ADDR", "127.0.0.1")

    def master_port(self) -> int:
        if self._master_port is None:
            self._master_port = os.environ.get("MASTER_PORT", find_free_network_port())
        return int(self._master_port)

cluster_environment = ExternalClusterEnvironment()
Trainer(plugins=cluster_environment)

Here, https://github.com/PyTorchLightning/pytorch-lightning/blob/master/pytorch_lightning/trainer/connectors/accelerator_connector.py#L724

if not is_overridden("main_port", cluster_environment, ClusterEnvironment):
    warning_cache.deprecation(
        "The `ClusterEnvironment.master_port` method is deprecated in v1.5 and will be removed in v1.7. "
        "Use `ClusterEnvironment.main_port` instead.",
        stacklevel=6,
    )
    cluster_environment.main_port = cluster_environment.master_port

if not is_overridden("main_address", cluster_environment, ClusterEnvironment):
    warning_cache.deprecation(
        "The `ClusterEnvironment.master_port` method is deprecated in v1.5 and will be removed in v1.7. "
        "Use `ClusterEnvironment.main_port` instead.",
        stacklevel=6,
    )
    cluster_environment.main_address = cluster_environment.master_address

@awaelchli Do you confirm the approach ?

@alli1999
Copy link
Author

Thanks alot @tchaton it really helps!

@mergify
Copy link
Contributor

mergify bot commented Aug 25, 2021

Sorry but I didn't understand the command. Please consult the commands documentation 📚.

Hey, I reacted but my real name is @Mergifyio

@alli1999
Copy link
Author

alli1999 commented Aug 25, 2021

Is this how it's suppose to be done?

import os
import socket

from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment
from pytorch_lightning.utilities import rank_zero_only


class LightningEnvironment(ClusterEnvironment):
    """
    The default environment used by Lightning for a single node or free cluster (not managed).

    There are two modes the Lightning environment can operate with:

    1.  The user only launches the main process by :code:`python train.py ...` with no additional environment variables
        set. Lightning will spawn new worker processes for distributed training in the current node.
    2.  The user launches all processes manually or with utilities like :code:`torch.distributed.launch`.
        The appropriate environment variables need to be set, and at minimum :code:`LOCAL_RANK`.

    If the master address and port are not provided, the default environment will choose them
    automatically. It is recommended to use this default environment for single-node distributed
    training as it provides a convenient way to launch the training script.
    """

    def __init__(self):
        super().__init__()
        self._master_port = None
        self._global_rank: int = 0
        self._world_size: int = 1

    def creates_children(self) -> bool:
        """
        Returns whether the cluster creates the processes or not.
        If at least :code:`LOCAL_RANK` is available as environment variable, Lightning assumes the user acts as the
        process launcher/job scheduler and Lightning will not launch new processes.
        """
        return "LOCAL_RANK" in os.environ

    def master_address(self) -> str:
        return os.environ.get("MASTER_ADDR", "127.0.0.1")
        """
        Summarize this LightningModule.
        .. deprecated:: v1.5
            "The `ClusterEnvironment.master_address` method is deprecated in v1.5 and will be removed in v1.7. "
            "Use `ClusterEnvironment.main_address` instead."ct
        """
    
    def main_address(self) -> str:
        return os.environ.get("MASTER_ADDR", "127.0.0.1")

    def master_port(self) -> int:
        if self._master_port is None:
            self._master_port = os.environ.get("MASTER_PORT", find_free_network_port())
        return int(self._master_port)
         """
        Summarize this LightningModule.
        .. deprecated:: v1.5
            "The `ClusterEnvironment.master_port` method is deprecated in v1.5 and will be removed in v1.7. "
            "Use `ClusterEnvironment.main_port` instead."ct
        """

    def main_port(self) -> int:
        if self._master_port is None:
            self._master_port = os.environ.get("MASTER_PORT", find_free_network_port())
        return int(self._master_port)

    def world_size(self) -> int:
        return self._world_size

    def set_world_size(self, size: int) -> None:
        self._world_size = size

    def global_rank(self) -> int:
        return self._global_rank

    def set_global_rank(self, rank: int) -> None:
        self._global_rank = rank
        rank_zero_only.rank = rank

    def local_rank(self) -> int:
        return int(os.environ.get("LOCAL_RANK", 0))

    def node_rank(self) -> int:
        group_rank = os.environ.get("GROUP_RANK", 0)
        return int(os.environ.get("NODE_RANK", group_rank))

    def teardown(self) -> None:
        if "WORLD_SIZE" in os.environ:
            del os.environ["WORLD_SIZE"]


def find_free_network_port() -> int:
    """
    Finds a free port on localhost.
    It is useful in single-node training when we don't want to connect to a real master node but
    have to set the `MASTER_PORT` environment variable.
    """
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(("", 0))
    s.listen(1)
    port = s.getsockname()[1]
    s.close()
    return port

@tchaton
Copy link
Contributor

tchaton commented Aug 25, 2021

Dear @alli1999,

Quick note:
Here is why we have depreciation. It is a message to inform users of a change.
User might implement a ClusterEnviromenent and pass it to Lightning
which resolve the env and gets the ranks, world_size, etc...

Therefore, here is what should be done:

  • For every lightning clusters, we can already modify master to main as you already did.
  • For users providing their own ClusterEnviromenent, we don't want to fail yet if they implement master_port and not main_port, so we just put a warning that this behaviour won't be supported in 2 versions from now.

TODOS:

I hope it is clear now :)

Best,
T.C

@alli1999
Copy link
Author

alli1999 commented Aug 26, 2021

Apologies for asking a lot of questions as it's my first time doing this. But in the accelerator_connector I have to add the below logic to it right? Is there a function definition to the is_overridden method? Adding the logic looks something like this.

def select_cluster_environment(self) -> ClusterEnvironment:
        if self._cluster_environment is not None:
            return self._cluster_environment
        if self.is_slurm_managing_tasks:
            env = SLURMEnvironment()
	    if not is_overridden("main_port", cluster_environment, ClusterEnvironment):
   	    	warning_cache.deprecation(
        	    "The `ClusterEnvironment.master_port` method is deprecated in v1.5 and will be removed in v1.7. "
        	    "Use `ClusterEnvironment.main_port` instead.",
		    stacklevel=6,
    		)
    		cluster_environment.main_port = cluster_environment.master_port

	    if not is_overridden("main_address", cluster_environment, ClusterEnvironment):
    		warning_cache.deprecation(
        	    "The `ClusterEnvironment.master_port` method is deprecated in v1.5 and will be removed in v1.7. "
        	    "Use `ClusterEnvironment.main_port` instead.",
        	    stacklevel=6,
    		)
    		cluster_environment.main_address = cluster_environment.master_address
        elif TorchElasticEnvironment.is_using_torchelastic():
            env = TorchElasticEnvironment()
        elif KubeflowEnvironment.is_using_kubeflow():
            env = KubeflowEnvironment()
        elif LSFEnvironment.is_using_lsf():
            env = LSFEnvironment()
        else:
            env = LightningEnvironment()
        return env

Also I added the deprecation warning to each method. It looks something like this just wanted to ask if this is how it's done.

 @abstractmethod
    def master_address(self) -> str:
        warning_cache.deprecation(
            "The `ClusterEnvironment.master_address` method is deprecated in v1.5 and will be removed in v1.7. "
            "Use `ClusterEnvironment.main_address` instead.",
            stacklevel=6,
        )
        """The master address through which all processes connect and communicate."""

    @abstractmethod
    def master_port(self) -> int:
        warning_cache.deprecation(
            "The `ClusterEnvironment.master_port` method is deprecated in v1.5 and will be removed in v1.7. "
            "Use `ClusterEnvironment.main_port` instead.",
            stacklevel=6,
        )
        """An open and configured port in the master node through which all processes communicate."""

@tchaton
Copy link
Contributor

tchaton commented Aug 26, 2021

Dear @alli1999 ,

Thanks for asking questions :)

You want to put a warning for user provided cluster_environement, which should be set as self._cluster ...

Therefore, the code should look like this.

def select_cluster_environment(self) -> ClusterEnvironment:
    if self._cluster_environment is not None:
        if not is_overridden("main_port", self._cluster_environment, ClusterEnvironment):
            warning_cache.deprecation(
                "The `ClusterEnvironment.master_port` method is deprecated in v1.5 and will be removed in v1.7. "
                "Use `ClusterEnvironment.main_port` instead.",
            stacklevel=6,
            )
            self._cluster_environment.main_port = self._cluster_environment.master_port

        if not is_overridden("main_address", self._cluster_environment, ClusterEnvironment):
            warning_cache.deprecation(
                "The `ClusterEnvironment.master_port` method is deprecated in v1.5 and will be removed in v1.7. "
                "Use `ClusterEnvironment.main_port` instead.",
                stacklevel=6,
            )
            self._cluster_environment.main_address = self._cluster_environment.master_address
        return self._cluster_environment
    if self.is_slurm_managing_tasks:
        env = SLURMEnvironment()
    elif TorchElasticEnvironment.is_using_torchelastic():
        env = TorchElasticEnvironment()
    elif KubeflowEnvironment.is_using_kubeflow():
        env = KubeflowEnvironment()
    elif LSFEnvironment.is_using_lsf():
        env = LSFEnvironment()
    else:
        env = LightningEnvironment()
    return env

And yes, you can put a warning within the ClusterEnvironment(ABC).

@alli1999
Copy link
Author

I've done the first part. Now onto the test. Add a test within https://github.com/PyTorchLightning/pytorch-lightning/tree/master/tests/deprecated_api by providing a depreciate ClusterEnviromenent implementing master_{} and validating Lightning emits a warning.
In creating the test I have to implement my own Cluster?

@tchaton
Copy link
Contributor

tchaton commented Aug 27, 2021

In creating the test I have to implement my own Cluster?

Yes :)

def node_rank(self) -> int:
return int(os.environ["NODE_RANK"])
def master_address(self) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a warning in the documentation than master_{} is depreceated for main and will be removed for 1.7.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done this

@alli1999
Copy link
Author

Regarding the test creation. Is this what I had to add? Also I can't seem to run it, I always end up getting a No Module Found error: pytorch_lightning
import pytest
import os
from pytorch_lightning.plugins.environments import ClusterEnvironment

class MyClusterEnvironment(ClusterEnvironment):
def creates_children(self) -> bool:
# return True if the cluster is managed (you don't launch processes yourself)
return True

def world_size(self) -> int:
    return int(os.environ["WORLD_SIZE"])

def global_rank(self) -> int:
    return int(os.environ["RANK"])

def local_rank(self) -> int:
    return int(os.environ["LOCAL_RANK"])

def node_rank(self) -> int:
    return int(os.environ["NODE_RANK"])

def master_address(self) -> str:
    return os.environ["MASTER_ADDRESS"]

def master_port(self) -> int:
    return int(os.environ["MASTER_PORT"])

trainer = Trainer(plugins=[MyClusterEnvironment()])
trainer.master_address()

@stale
Copy link

stale bot commented Sep 12, 2021

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need further help see our docs: https://pytorch-lightning.readthedocs.io/en/latest/generated/CONTRIBUTING.html#pull-request or ask the assistance of a core contributor here or on Slack. Thank you for your contributions.

@stale stale bot added the won't fix This will not be worked on label Sep 12, 2021
@stale
Copy link

stale bot commented Sep 17, 2021

This pull request is going to be closed. Please feel free to reopen it create a new from the actual master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

won't fix This will not be worked on

Projects

None yet

Development

Successfully merging this pull request may close these issues.

rename master* properties in cluster environments to main*

3 participants