From 1db7a0c29c0872e51c384d7e3df9106a5cd829a7 Mon Sep 17 00:00:00 2001 From: Atharva-Phatak Date: Thu, 29 Sep 2022 14:42:14 -0400 Subject: [PATCH 01/10] hydra changes to lightning lite --- .../strategies/launchers/subprocess_script.py | 58 ++++++++++++++++--- .../strategies/launchers/subprocess_script.py | 2 +- 2 files changed, 50 insertions(+), 10 deletions(-) diff --git a/src/lightning_lite/strategies/launchers/subprocess_script.py b/src/lightning_lite/strategies/launchers/subprocess_script.py index b2da01835d9ec..913195e709d1a 100644 --- a/src/lightning_lite/strategies/launchers/subprocess_script.py +++ b/src/lightning_lite/strategies/launchers/subprocess_script.py @@ -15,7 +15,7 @@ import subprocess import sys from time import sleep -from typing import Any, Callable, Optional +from typing import Any, Callable, Sequence import __main__ import numpy as np @@ -142,16 +142,15 @@ def _call_children_scripts(self) -> None: # start process # if hydra is available and initialized, make sure to set the cwd correctly - cwd: Optional[str] = None + hydra_in_use = False if _HYDRA_AVAILABLE: from hydra.core.hydra_config import HydraConfig - from hydra.utils import get_original_cwd - - if HydraConfig.initialized(): - cwd = get_original_cwd() - os_cwd = f'"{os.getcwd()}"' - command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"] - subprocess.Popen(command, env=env_copy, cwd=cwd) + hydra_in_use = HydraConfig.initialized() + if hydra_in_use: + command = _hydra_subprocess_cmd(local_rank=local_rank) + else: + command = _basic_subprocess_cmd(local_rank=local_rank) + subprocess.Popen(command, env=env_copy) # starting all processes at once can cause issues # with dataloaders delay between 1-10 seconds @@ -165,3 +164,44 @@ def _check_can_spawn_children(self) -> None: " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." ) + + +def _basic_subprocess_cmd(local_rank: int) -> Sequence[str]: + if __main__.__spec__ is None: # pragma: no-cover + return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:] + else: + return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:] + + +def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]: + from hydra.core.hydra_config import HydraConfig + from hydra.utils import to_absolute_path + + # when user is using hydra find the absolute path + if __main__.__spec__ is None: # pragma: no-cover + command = [sys.executable, to_absolute_path(sys.argv[0])] + else: + command = [sys.executable, "-m", __main__.__spec__.name] + + # extract the hydra configuration + hydra_cfg = HydraConfig.get() + + # the location of the hydra configuration files saved for the current job + hydra_output = hydra_cfg.runtime.output_dir + if hydra_cfg.output_subdir is not None: + hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir) + + # check if experimental re-run capability exists + # otherwise use existing config.yaml which may have issues + pickled_config = os.path.join(hydra_output, "config.pickle") + if os.path.exists(pickled_config): + command += ["--experimental-rerun", pickled_config] + + else: + command += ["-cp", hydra_output, "-cn", "config.yaml"] + command += [ + f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", + f"hydra.run.dir={hydra_cfg.runtime.output_dir}", + ] + + return command diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index 62694f1e3575f..ae95aeb8275ae 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -155,7 +155,7 @@ def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]: else: command = [sys.executable, "-m", __main__.__spec__.name] - # extract the hydra configu + # extract the hydra configuration hydra_cfg = HydraConfig.get() # the location of the hydra configuration files saved for the current job From 21f6159a88f39b0bdc9fc45617db31cc9777aad3 Mon Sep 17 00:00:00 2001 From: Atharva-Phatak Date: Thu, 29 Sep 2022 14:46:31 -0400 Subject: [PATCH 02/10] hydra-path-fixes --- .../strategies/launchers/subprocess_script.py | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/src/lightning_lite/strategies/launchers/subprocess_script.py b/src/lightning_lite/strategies/launchers/subprocess_script.py index 913195e709d1a..e19189978f601 100644 --- a/src/lightning_lite/strategies/launchers/subprocess_script.py +++ b/src/lightning_lite/strategies/launchers/subprocess_script.py @@ -104,32 +104,6 @@ def _call_children_scripts(self) -> None: # allow the user to pass the node rank os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank()) os.environ["LOCAL_RANK"] = str(self.cluster_environment.local_rank()) - - # Check if the current calling command looked like `python a/b/c.py` or `python -m a.b.c` - # See https://docs.python.org/3/reference/import.html#main-spec - if __main__.__spec__ is None: # pragma: no-cover - # Script called as `python a/b/c.py` - if _HYDRA_AVAILABLE: - # when user is using hydra find the absolute path - from hydra.utils import to_absolute_path - - to_abs_path = to_absolute_path - else: - to_abs_path = os.path.abspath - - # pull out the commands used to run the script and resolve the absolute file path - command = sys.argv - try: - full_path = to_abs_path(command[0]) - except Exception: - full_path = os.path.abspath(command[0]) - - command[0] = full_path - # use the same python interpreter and actually running - command = [sys.executable] + command - else: # Script called as `python -m a.b.c` - command = [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:] - os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}" for local_rank in range(1, self.num_processes): From 1b3b96cbed6ce9e545a6ced98a088978253cc98f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 29 Sep 2022 18:51:03 +0000 Subject: [PATCH 03/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/lightning_lite/strategies/launchers/subprocess_script.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lightning_lite/strategies/launchers/subprocess_script.py b/src/lightning_lite/strategies/launchers/subprocess_script.py index e19189978f601..7f33eb29847a4 100644 --- a/src/lightning_lite/strategies/launchers/subprocess_script.py +++ b/src/lightning_lite/strategies/launchers/subprocess_script.py @@ -119,6 +119,7 @@ def _call_children_scripts(self) -> None: hydra_in_use = False if _HYDRA_AVAILABLE: from hydra.core.hydra_config import HydraConfig + hydra_in_use = HydraConfig.initialized() if hydra_in_use: command = _hydra_subprocess_cmd(local_rank=local_rank) From fe94d46cadaafd4de4eee3c1e826f54d777225bf Mon Sep 17 00:00:00 2001 From: Atharva-Phatak Date: Thu, 29 Sep 2022 18:09:05 -0400 Subject: [PATCH 04/10] minor import fix --- .../strategies/launchers/subprocess_script.py | 54 +++---------------- 1 file changed, 8 insertions(+), 46 deletions(-) diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index ae95aeb8275ae..658a0ecb136f0 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -15,7 +15,7 @@ import subprocess import sys from time import sleep -from typing import Any, Callable, Optional, Sequence +from typing import Any, Callable, Optional import __main__ import numpy as np @@ -24,6 +24,7 @@ import pytorch_lightning as pl from lightning_lite.plugins import ClusterEnvironment from lightning_lite.strategies.launchers.base import _Launcher +from lightning_lite.strategies.launchers.subprocess_script import _basic_subprocess_cmd, _hydra_subprocess_cmd _HYDRA_AVAILABLE = RequirementCache("hydra-core") @@ -66,15 +67,17 @@ class _SubprocessScriptLauncher(_Launcher): num_nodes: The total number of nodes that participate in this process group. """ - @property - def is_interactive_compatible(self) -> bool: - return False + def __init__(self, cluster_environment: ClusterEnvironment, num_processes: int, num_nodes: int) -> None: super().__init__() self.cluster_environment = cluster_environment self.num_processes = num_processes self.num_nodes = num_nodes + + @property + def is_interactive_compatible(self) -> bool: + return False def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any: """Creates new processes, then calls the given function. @@ -135,45 +138,4 @@ def _check_can_spawn_children(self) -> None: "Lightning attempted to launch new distributed processes with `local_rank > 0`. This should not happen." " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." - ) - - -def _basic_subprocess_cmd(local_rank: int) -> Sequence[str]: - if __main__.__spec__ is None: # pragma: no-cover - return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:] - else: - return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:] - - -def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]: - from hydra.core.hydra_config import HydraConfig - from hydra.utils import to_absolute_path - - # when user is using hydra find the absolute path - if __main__.__spec__ is None: # pragma: no-cover - command = [sys.executable, to_absolute_path(sys.argv[0])] - else: - command = [sys.executable, "-m", __main__.__spec__.name] - - # extract the hydra configuration - hydra_cfg = HydraConfig.get() - - # the location of the hydra configuration files saved for the current job - hydra_output = hydra_cfg.runtime.output_dir - if hydra_cfg.output_subdir is not None: - hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir) - - # check if experimental re-run capability exists - # otherwise use existing config.yaml which may have issues - pickled_config = os.path.join(hydra_output, "config.pickle") - if os.path.exists(pickled_config): - command += ["--experimental-rerun", pickled_config] - - else: - command += ["-cp", hydra_output, "-cn", "config.yaml"] - command += [ - f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", - f"hydra.run.dir={hydra_cfg.runtime.output_dir}", - ] - - return command + ) \ No newline at end of file From 906465547e0375b995e9baf9514c4ab94ae9880a Mon Sep 17 00:00:00 2001 From: Atharva-Phatak Date: Thu, 29 Sep 2022 18:11:28 -0400 Subject: [PATCH 05/10] minor import fixes --- src/pytorch_lightning/strategies/launchers/subprocess_script.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index 658a0ecb136f0..631ef573ab799 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -25,7 +25,6 @@ from lightning_lite.plugins import ClusterEnvironment from lightning_lite.strategies.launchers.base import _Launcher from lightning_lite.strategies.launchers.subprocess_script import _basic_subprocess_cmd, _hydra_subprocess_cmd - _HYDRA_AVAILABLE = RequirementCache("hydra-core") From 68d9ed8b79160b28f874b463949357d2b91d16fb Mon Sep 17 00:00:00 2001 From: Atharva-Phatak Date: Thu, 29 Sep 2022 18:13:03 -0400 Subject: [PATCH 06/10] import-fixes --- src/pytorch_lightning/strategies/launchers/subprocess_script.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index 631ef573ab799..e8bd9baec5e74 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -27,7 +27,6 @@ from lightning_lite.strategies.launchers.subprocess_script import _basic_subprocess_cmd, _hydra_subprocess_cmd _HYDRA_AVAILABLE = RequirementCache("hydra-core") - class _SubprocessScriptLauncher(_Launcher): r""" A process laucher that invokes the current script as many times as desired in a single node. From ba7e8f6eaac13a59d8cfc0da2241d9013e9e5b8a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 29 Sep 2022 22:14:54 +0000 Subject: [PATCH 07/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../strategies/launchers/subprocess_script.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index e8bd9baec5e74..182eedcbc333e 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -25,8 +25,10 @@ from lightning_lite.plugins import ClusterEnvironment from lightning_lite.strategies.launchers.base import _Launcher from lightning_lite.strategies.launchers.subprocess_script import _basic_subprocess_cmd, _hydra_subprocess_cmd + _HYDRA_AVAILABLE = RequirementCache("hydra-core") + class _SubprocessScriptLauncher(_Launcher): r""" A process laucher that invokes the current script as many times as desired in a single node. @@ -65,14 +67,12 @@ class _SubprocessScriptLauncher(_Launcher): num_nodes: The total number of nodes that participate in this process group. """ - - def __init__(self, cluster_environment: ClusterEnvironment, num_processes: int, num_nodes: int) -> None: super().__init__() self.cluster_environment = cluster_environment self.num_processes = num_processes self.num_nodes = num_nodes - + @property def is_interactive_compatible(self) -> bool: return False @@ -136,4 +136,4 @@ def _check_can_spawn_children(self) -> None: "Lightning attempted to launch new distributed processes with `local_rank > 0`. This should not happen." " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." - ) \ No newline at end of file + ) From 1901242f2cc752950e36a3747a42c2112ca157a8 Mon Sep 17 00:00:00 2001 From: awaelchli Date: Fri, 30 Sep 2022 01:51:10 +0200 Subject: [PATCH 08/10] remove unused argument --- src/lightning_lite/strategies/launchers/subprocess_script.py | 4 ++-- .../strategies/launchers/subprocess_script.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lightning_lite/strategies/launchers/subprocess_script.py b/src/lightning_lite/strategies/launchers/subprocess_script.py index 7f33eb29847a4..511f087d4b9ea 100644 --- a/src/lightning_lite/strategies/launchers/subprocess_script.py +++ b/src/lightning_lite/strategies/launchers/subprocess_script.py @@ -124,7 +124,7 @@ def _call_children_scripts(self) -> None: if hydra_in_use: command = _hydra_subprocess_cmd(local_rank=local_rank) else: - command = _basic_subprocess_cmd(local_rank=local_rank) + command = _basic_subprocess_cmd() subprocess.Popen(command, env=env_copy) # starting all processes at once can cause issues @@ -141,7 +141,7 @@ def _check_can_spawn_children(self) -> None: ) -def _basic_subprocess_cmd(local_rank: int) -> Sequence[str]: +def _basic_subprocess_cmd() -> Sequence[str]: if __main__.__spec__ is None: # pragma: no-cover return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:] else: diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index 182eedcbc333e..a71a2fdab9741 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -121,7 +121,7 @@ def _call_children_scripts(self) -> None: if hydra_in_use: command = _hydra_subprocess_cmd(local_rank) else: - command = _basic_subprocess_cmd(local_rank) + command = _basic_subprocess_cmd() subprocess.Popen(command, env=env_copy) From 8589ed34c5699851ac88613e10676b873d283634 Mon Sep 17 00:00:00 2001 From: awaelchli Date: Fri, 30 Sep 2022 03:01:39 +0200 Subject: [PATCH 09/10] add simple test --- .../strategies/launchers/subprocess_script.py | 2 +- .../launchers/test_subprocess_script.py | 51 ++++++++++++++++++- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/lightning_lite/strategies/launchers/subprocess_script.py b/src/lightning_lite/strategies/launchers/subprocess_script.py index 511f087d4b9ea..c28946faa9ab6 100644 --- a/src/lightning_lite/strategies/launchers/subprocess_script.py +++ b/src/lightning_lite/strategies/launchers/subprocess_script.py @@ -24,7 +24,7 @@ from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment from lightning_lite.strategies.launchers.base import _Launcher -_HYDRA_AVAILABLE = RequirementCache("hydra") +_HYDRA_AVAILABLE = RequirementCache("hydra-core") class _SubprocessScriptLauncher(_Launcher): diff --git a/tests/tests_lite/strategies/launchers/test_subprocess_script.py b/tests/tests_lite/strategies/launchers/test_subprocess_script.py index c9af07343b454..1b2e4360e2432 100644 --- a/tests/tests_lite/strategies/launchers/test_subprocess_script.py +++ b/tests/tests_lite/strategies/launchers/test_subprocess_script.py @@ -13,11 +13,12 @@ # limitations under the License. import os from unittest import mock -from unittest.mock import Mock +from unittest.mock import ANY, Mock import pytest -from lightning_lite.strategies.launchers.subprocess_script import _SubprocessScriptLauncher +import lightning_lite +from lightning_lite.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE, _SubprocessScriptLauncher def test_subprocess_script_launcher_interactive_compatible(): @@ -76,3 +77,49 @@ def test_subprocess_script_launcher_launch_processes(popen_mock, _): # the current process assert int(os.environ["WORLD_SIZE"]) == launcher.num_processes * launcher.num_nodes assert int(os.environ["LOCAL_RANK"]) == 0 + + +@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="hydra-core is required") +@mock.patch("lightning_lite.strategies.launchers.subprocess_script.sleep") +@mock.patch("lightning_lite.strategies.launchers.subprocess_script.subprocess.Popen") +def test_subprocess_script_launcher_hydra_in_use(popen_mock, _, monkeypatch): + basic_command = Mock(return_value="basic_command") + hydra_command = Mock(return_value="hydra_command") + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_basic_subprocess_cmd", basic_command) + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_hydra_subprocess_cmd", hydra_command) + + def simulate_launch(): + cluster_env = Mock() + cluster_env.creates_processes_externally = False + cluster_env.local_rank.return_value = 0 + cluster_env.main_address = "address" + cluster_env.main_port = 1234 + function = Mock() + launcher = _SubprocessScriptLauncher(cluster_env, num_processes=4, num_nodes=2) + launcher.launch(function) + + # when hydra not available + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", False) + simulate_launch() + popen_mock.assert_called_with("basic_command", env=ANY) + popen_mock.reset_mock() + + import hydra + + # when hydra available but not initialized + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", True) + HydraConfigMock = Mock() + HydraConfigMock.initialized.return_value = False + monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock) + simulate_launch() + popen_mock.assert_called_with("basic_command", env=ANY) + popen_mock.reset_mock() + + # when hydra available and initialized + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", True) + HydraConfigMock = Mock() + HydraConfigMock.initialized.return_value = True + monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock) + simulate_launch() + popen_mock.assert_called_with("hydra_command", env=ANY) + popen_mock.reset_mock() From 21b60c4172cf4e6a08f244f3b645fdc1b5f52a3a Mon Sep 17 00:00:00 2001 From: awaelchli Date: Fri, 30 Sep 2022 03:11:03 +0200 Subject: [PATCH 10/10] remove unused imports --- src/pytorch_lightning/strategies/launchers/subprocess_script.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index a71a2fdab9741..b5f4d39973ede 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -13,11 +13,9 @@ # limitations under the License. import os import subprocess -import sys from time import sleep from typing import Any, Callable, Optional -import __main__ import numpy as np from lightning_utilities.core.imports import RequirementCache