diff --git a/src/lightning_lite/strategies/launchers/subprocess_script.py b/src/lightning_lite/strategies/launchers/subprocess_script.py index b2da01835d9ec..c28946faa9ab6 100644 --- a/src/lightning_lite/strategies/launchers/subprocess_script.py +++ b/src/lightning_lite/strategies/launchers/subprocess_script.py @@ -15,7 +15,7 @@ import subprocess import sys from time import sleep -from typing import Any, Callable, Optional +from typing import Any, Callable, Sequence import __main__ import numpy as np @@ -24,7 +24,7 @@ from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment from lightning_lite.strategies.launchers.base import _Launcher -_HYDRA_AVAILABLE = RequirementCache("hydra") +_HYDRA_AVAILABLE = RequirementCache("hydra-core") class _SubprocessScriptLauncher(_Launcher): @@ -104,32 +104,6 @@ def _call_children_scripts(self) -> None: # allow the user to pass the node rank os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank()) os.environ["LOCAL_RANK"] = str(self.cluster_environment.local_rank()) - - # Check if the current calling command looked like `python a/b/c.py` or `python -m a.b.c` - # See https://docs.python.org/3/reference/import.html#main-spec - if __main__.__spec__ is None: # pragma: no-cover - # Script called as `python a/b/c.py` - if _HYDRA_AVAILABLE: - # when user is using hydra find the absolute path - from hydra.utils import to_absolute_path - - to_abs_path = to_absolute_path - else: - to_abs_path = os.path.abspath - - # pull out the commands used to run the script and resolve the absolute file path - command = sys.argv - try: - full_path = to_abs_path(command[0]) - except Exception: - full_path = os.path.abspath(command[0]) - - command[0] = full_path - # use the same python interpreter and actually running - command = [sys.executable] + command - else: # Script called as `python -m a.b.c` - command = [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:] - os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}" for local_rank in range(1, self.num_processes): @@ -142,16 +116,16 @@ def _call_children_scripts(self) -> None: # start process # if hydra is available and initialized, make sure to set the cwd correctly - cwd: Optional[str] = None + hydra_in_use = False if _HYDRA_AVAILABLE: from hydra.core.hydra_config import HydraConfig - from hydra.utils import get_original_cwd - if HydraConfig.initialized(): - cwd = get_original_cwd() - os_cwd = f'"{os.getcwd()}"' - command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"] - subprocess.Popen(command, env=env_copy, cwd=cwd) + hydra_in_use = HydraConfig.initialized() + if hydra_in_use: + command = _hydra_subprocess_cmd(local_rank=local_rank) + else: + command = _basic_subprocess_cmd() + subprocess.Popen(command, env=env_copy) # starting all processes at once can cause issues # with dataloaders delay between 1-10 seconds @@ -165,3 +139,44 @@ def _check_can_spawn_children(self) -> None: " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." ) + + +def _basic_subprocess_cmd() -> Sequence[str]: + if __main__.__spec__ is None: # pragma: no-cover + return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:] + else: + return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:] + + +def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]: + from hydra.core.hydra_config import HydraConfig + from hydra.utils import to_absolute_path + + # when user is using hydra find the absolute path + if __main__.__spec__ is None: # pragma: no-cover + command = [sys.executable, to_absolute_path(sys.argv[0])] + else: + command = [sys.executable, "-m", __main__.__spec__.name] + + # extract the hydra configuration + hydra_cfg = HydraConfig.get() + + # the location of the hydra configuration files saved for the current job + hydra_output = hydra_cfg.runtime.output_dir + if hydra_cfg.output_subdir is not None: + hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir) + + # check if experimental re-run capability exists + # otherwise use existing config.yaml which may have issues + pickled_config = os.path.join(hydra_output, "config.pickle") + if os.path.exists(pickled_config): + command += ["--experimental-rerun", pickled_config] + + else: + command += ["-cp", hydra_output, "-cn", "config.yaml"] + command += [ + f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", + f"hydra.run.dir={hydra_cfg.runtime.output_dir}", + ] + + return command diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index 62694f1e3575f..b5f4d39973ede 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -13,17 +13,16 @@ # limitations under the License. import os import subprocess -import sys from time import sleep -from typing import Any, Callable, Optional, Sequence +from typing import Any, Callable, Optional -import __main__ import numpy as np from lightning_utilities.core.imports import RequirementCache import pytorch_lightning as pl from lightning_lite.plugins import ClusterEnvironment from lightning_lite.strategies.launchers.base import _Launcher +from lightning_lite.strategies.launchers.subprocess_script import _basic_subprocess_cmd, _hydra_subprocess_cmd _HYDRA_AVAILABLE = RequirementCache("hydra-core") @@ -66,16 +65,16 @@ class _SubprocessScriptLauncher(_Launcher): num_nodes: The total number of nodes that participate in this process group. """ - @property - def is_interactive_compatible(self) -> bool: - return False - def __init__(self, cluster_environment: ClusterEnvironment, num_processes: int, num_nodes: int) -> None: super().__init__() self.cluster_environment = cluster_environment self.num_processes = num_processes self.num_nodes = num_nodes + @property + def is_interactive_compatible(self) -> bool: + return False + def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any: """Creates new processes, then calls the given function. @@ -120,7 +119,7 @@ def _call_children_scripts(self) -> None: if hydra_in_use: command = _hydra_subprocess_cmd(local_rank) else: - command = _basic_subprocess_cmd(local_rank) + command = _basic_subprocess_cmd() subprocess.Popen(command, env=env_copy) @@ -136,44 +135,3 @@ def _check_can_spawn_children(self) -> None: " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." ) - - -def _basic_subprocess_cmd(local_rank: int) -> Sequence[str]: - if __main__.__spec__ is None: # pragma: no-cover - return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:] - else: - return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:] - - -def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]: - from hydra.core.hydra_config import HydraConfig - from hydra.utils import to_absolute_path - - # when user is using hydra find the absolute path - if __main__.__spec__ is None: # pragma: no-cover - command = [sys.executable, to_absolute_path(sys.argv[0])] - else: - command = [sys.executable, "-m", __main__.__spec__.name] - - # extract the hydra configu - hydra_cfg = HydraConfig.get() - - # the location of the hydra configuration files saved for the current job - hydra_output = hydra_cfg.runtime.output_dir - if hydra_cfg.output_subdir is not None: - hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir) - - # check if experimental re-run capability exists - # otherwise use existing config.yaml which may have issues - pickled_config = os.path.join(hydra_output, "config.pickle") - if os.path.exists(pickled_config): - command += ["--experimental-rerun", pickled_config] - - else: - command += ["-cp", hydra_output, "-cn", "config.yaml"] - command += [ - f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", - f"hydra.run.dir={hydra_cfg.runtime.output_dir}", - ] - - return command diff --git a/tests/tests_lite/strategies/launchers/test_subprocess_script.py b/tests/tests_lite/strategies/launchers/test_subprocess_script.py index c9af07343b454..1b2e4360e2432 100644 --- a/tests/tests_lite/strategies/launchers/test_subprocess_script.py +++ b/tests/tests_lite/strategies/launchers/test_subprocess_script.py @@ -13,11 +13,12 @@ # limitations under the License. import os from unittest import mock -from unittest.mock import Mock +from unittest.mock import ANY, Mock import pytest -from lightning_lite.strategies.launchers.subprocess_script import _SubprocessScriptLauncher +import lightning_lite +from lightning_lite.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE, _SubprocessScriptLauncher def test_subprocess_script_launcher_interactive_compatible(): @@ -76,3 +77,49 @@ def test_subprocess_script_launcher_launch_processes(popen_mock, _): # the current process assert int(os.environ["WORLD_SIZE"]) == launcher.num_processes * launcher.num_nodes assert int(os.environ["LOCAL_RANK"]) == 0 + + +@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="hydra-core is required") +@mock.patch("lightning_lite.strategies.launchers.subprocess_script.sleep") +@mock.patch("lightning_lite.strategies.launchers.subprocess_script.subprocess.Popen") +def test_subprocess_script_launcher_hydra_in_use(popen_mock, _, monkeypatch): + basic_command = Mock(return_value="basic_command") + hydra_command = Mock(return_value="hydra_command") + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_basic_subprocess_cmd", basic_command) + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_hydra_subprocess_cmd", hydra_command) + + def simulate_launch(): + cluster_env = Mock() + cluster_env.creates_processes_externally = False + cluster_env.local_rank.return_value = 0 + cluster_env.main_address = "address" + cluster_env.main_port = 1234 + function = Mock() + launcher = _SubprocessScriptLauncher(cluster_env, num_processes=4, num_nodes=2) + launcher.launch(function) + + # when hydra not available + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", False) + simulate_launch() + popen_mock.assert_called_with("basic_command", env=ANY) + popen_mock.reset_mock() + + import hydra + + # when hydra available but not initialized + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", True) + HydraConfigMock = Mock() + HydraConfigMock.initialized.return_value = False + monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock) + simulate_launch() + popen_mock.assert_called_with("basic_command", env=ANY) + popen_mock.reset_mock() + + # when hydra available and initialized + monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", True) + HydraConfigMock = Mock() + HydraConfigMock.initialized.return_value = True + monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock) + simulate_launch() + popen_mock.assert_called_with("hydra_command", env=ANY) + popen_mock.reset_mock()