Skip to content

Commit fdcb5cc

Browse files
Hydra changes to lightning-lite (#14950)
Co-authored-by: awaelchli <[email protected]>
1 parent f9ef19f commit fdcb5cc

File tree

3 files changed

+106
-86
lines changed

3 files changed

+106
-86
lines changed

src/lightning_lite/strategies/launchers/subprocess_script.py

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import subprocess
1616
import sys
1717
from time import sleep
18-
from typing import Any, Callable, Optional
18+
from typing import Any, Callable, Sequence
1919

2020
import __main__
2121
import numpy as np
@@ -24,7 +24,7 @@
2424
from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment
2525
from lightning_lite.strategies.launchers.base import _Launcher
2626

27-
_HYDRA_AVAILABLE = RequirementCache("hydra")
27+
_HYDRA_AVAILABLE = RequirementCache("hydra-core")
2828

2929

3030
class _SubprocessScriptLauncher(_Launcher):
@@ -104,32 +104,6 @@ def _call_children_scripts(self) -> None:
104104
# allow the user to pass the node rank
105105
os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank())
106106
os.environ["LOCAL_RANK"] = str(self.cluster_environment.local_rank())
107-
108-
# Check if the current calling command looked like `python a/b/c.py` or `python -m a.b.c`
109-
# See https://docs.python.org/3/reference/import.html#main-spec
110-
if __main__.__spec__ is None: # pragma: no-cover
111-
# Script called as `python a/b/c.py`
112-
if _HYDRA_AVAILABLE:
113-
# when user is using hydra find the absolute path
114-
from hydra.utils import to_absolute_path
115-
116-
to_abs_path = to_absolute_path
117-
else:
118-
to_abs_path = os.path.abspath
119-
120-
# pull out the commands used to run the script and resolve the absolute file path
121-
command = sys.argv
122-
try:
123-
full_path = to_abs_path(command[0])
124-
except Exception:
125-
full_path = os.path.abspath(command[0])
126-
127-
command[0] = full_path
128-
# use the same python interpreter and actually running
129-
command = [sys.executable] + command
130-
else: # Script called as `python -m a.b.c`
131-
command = [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]
132-
133107
os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}"
134108

135109
for local_rank in range(1, self.num_processes):
@@ -142,16 +116,16 @@ def _call_children_scripts(self) -> None:
142116

143117
# start process
144118
# if hydra is available and initialized, make sure to set the cwd correctly
145-
cwd: Optional[str] = None
119+
hydra_in_use = False
146120
if _HYDRA_AVAILABLE:
147121
from hydra.core.hydra_config import HydraConfig
148-
from hydra.utils import get_original_cwd
149122

150-
if HydraConfig.initialized():
151-
cwd = get_original_cwd()
152-
os_cwd = f'"{os.getcwd()}"'
153-
command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"]
154-
subprocess.Popen(command, env=env_copy, cwd=cwd)
123+
hydra_in_use = HydraConfig.initialized()
124+
if hydra_in_use:
125+
command = _hydra_subprocess_cmd(local_rank=local_rank)
126+
else:
127+
command = _basic_subprocess_cmd()
128+
subprocess.Popen(command, env=env_copy)
155129

156130
# starting all processes at once can cause issues
157131
# with dataloaders delay between 1-10 seconds
@@ -165,3 +139,44 @@ def _check_can_spawn_children(self) -> None:
165139
" Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user,"
166140
" 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented."
167141
)
142+
143+
144+
def _basic_subprocess_cmd() -> Sequence[str]:
145+
if __main__.__spec__ is None: # pragma: no-cover
146+
return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:]
147+
else:
148+
return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]
149+
150+
151+
def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]:
152+
from hydra.core.hydra_config import HydraConfig
153+
from hydra.utils import to_absolute_path
154+
155+
# when user is using hydra find the absolute path
156+
if __main__.__spec__ is None: # pragma: no-cover
157+
command = [sys.executable, to_absolute_path(sys.argv[0])]
158+
else:
159+
command = [sys.executable, "-m", __main__.__spec__.name]
160+
161+
# extract the hydra configuration
162+
hydra_cfg = HydraConfig.get()
163+
164+
# the location of the hydra configuration files saved for the current job
165+
hydra_output = hydra_cfg.runtime.output_dir
166+
if hydra_cfg.output_subdir is not None:
167+
hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir)
168+
169+
# check if experimental re-run capability exists
170+
# otherwise use existing config.yaml which may have issues
171+
pickled_config = os.path.join(hydra_output, "config.pickle")
172+
if os.path.exists(pickled_config):
173+
command += ["--experimental-rerun", pickled_config]
174+
175+
else:
176+
command += ["-cp", hydra_output, "-cn", "config.yaml"]
177+
command += [
178+
f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}",
179+
f"hydra.run.dir={hydra_cfg.runtime.output_dir}",
180+
]
181+
182+
return command

src/pytorch_lightning/strategies/launchers/subprocess_script.py

Lines changed: 7 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,16 @@
1313
# limitations under the License.
1414
import os
1515
import subprocess
16-
import sys
1716
from time import sleep
18-
from typing import Any, Callable, Optional, Sequence
17+
from typing import Any, Callable, Optional
1918

20-
import __main__
2119
import numpy as np
2220
from lightning_utilities.core.imports import RequirementCache
2321

2422
import pytorch_lightning as pl
2523
from lightning_lite.plugins import ClusterEnvironment
2624
from lightning_lite.strategies.launchers.base import _Launcher
25+
from lightning_lite.strategies.launchers.subprocess_script import _basic_subprocess_cmd, _hydra_subprocess_cmd
2726

2827
_HYDRA_AVAILABLE = RequirementCache("hydra-core")
2928

@@ -66,16 +65,16 @@ class _SubprocessScriptLauncher(_Launcher):
6665
num_nodes: The total number of nodes that participate in this process group.
6766
"""
6867

69-
@property
70-
def is_interactive_compatible(self) -> bool:
71-
return False
72-
7368
def __init__(self, cluster_environment: ClusterEnvironment, num_processes: int, num_nodes: int) -> None:
7469
super().__init__()
7570
self.cluster_environment = cluster_environment
7671
self.num_processes = num_processes
7772
self.num_nodes = num_nodes
7873

74+
@property
75+
def is_interactive_compatible(self) -> bool:
76+
return False
77+
7978
def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any:
8079
"""Creates new processes, then calls the given function.
8180
@@ -120,7 +119,7 @@ def _call_children_scripts(self) -> None:
120119
if hydra_in_use:
121120
command = _hydra_subprocess_cmd(local_rank)
122121
else:
123-
command = _basic_subprocess_cmd(local_rank)
122+
command = _basic_subprocess_cmd()
124123

125124
subprocess.Popen(command, env=env_copy)
126125

@@ -136,44 +135,3 @@ def _check_can_spawn_children(self) -> None:
136135
" Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user,"
137136
" 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented."
138137
)
139-
140-
141-
def _basic_subprocess_cmd(local_rank: int) -> Sequence[str]:
142-
if __main__.__spec__ is None: # pragma: no-cover
143-
return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:]
144-
else:
145-
return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]
146-
147-
148-
def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]:
149-
from hydra.core.hydra_config import HydraConfig
150-
from hydra.utils import to_absolute_path
151-
152-
# when user is using hydra find the absolute path
153-
if __main__.__spec__ is None: # pragma: no-cover
154-
command = [sys.executable, to_absolute_path(sys.argv[0])]
155-
else:
156-
command = [sys.executable, "-m", __main__.__spec__.name]
157-
158-
# extract the hydra configu
159-
hydra_cfg = HydraConfig.get()
160-
161-
# the location of the hydra configuration files saved for the current job
162-
hydra_output = hydra_cfg.runtime.output_dir
163-
if hydra_cfg.output_subdir is not None:
164-
hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir)
165-
166-
# check if experimental re-run capability exists
167-
# otherwise use existing config.yaml which may have issues
168-
pickled_config = os.path.join(hydra_output, "config.pickle")
169-
if os.path.exists(pickled_config):
170-
command += ["--experimental-rerun", pickled_config]
171-
172-
else:
173-
command += ["-cp", hydra_output, "-cn", "config.yaml"]
174-
command += [
175-
f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}",
176-
f"hydra.run.dir={hydra_cfg.runtime.output_dir}",
177-
]
178-
179-
return command

tests/tests_lite/strategies/launchers/test_subprocess_script.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
# limitations under the License.
1414
import os
1515
from unittest import mock
16-
from unittest.mock import Mock
16+
from unittest.mock import ANY, Mock
1717

1818
import pytest
1919

20-
from lightning_lite.strategies.launchers.subprocess_script import _SubprocessScriptLauncher
20+
import lightning_lite
21+
from lightning_lite.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE, _SubprocessScriptLauncher
2122

2223

2324
def test_subprocess_script_launcher_interactive_compatible():
@@ -76,3 +77,49 @@ def test_subprocess_script_launcher_launch_processes(popen_mock, _):
7677
# the current process
7778
assert int(os.environ["WORLD_SIZE"]) == launcher.num_processes * launcher.num_nodes
7879
assert int(os.environ["LOCAL_RANK"]) == 0
80+
81+
82+
@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="hydra-core is required")
83+
@mock.patch("lightning_lite.strategies.launchers.subprocess_script.sleep")
84+
@mock.patch("lightning_lite.strategies.launchers.subprocess_script.subprocess.Popen")
85+
def test_subprocess_script_launcher_hydra_in_use(popen_mock, _, monkeypatch):
86+
basic_command = Mock(return_value="basic_command")
87+
hydra_command = Mock(return_value="hydra_command")
88+
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_basic_subprocess_cmd", basic_command)
89+
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_hydra_subprocess_cmd", hydra_command)
90+
91+
def simulate_launch():
92+
cluster_env = Mock()
93+
cluster_env.creates_processes_externally = False
94+
cluster_env.local_rank.return_value = 0
95+
cluster_env.main_address = "address"
96+
cluster_env.main_port = 1234
97+
function = Mock()
98+
launcher = _SubprocessScriptLauncher(cluster_env, num_processes=4, num_nodes=2)
99+
launcher.launch(function)
100+
101+
# when hydra not available
102+
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", False)
103+
simulate_launch()
104+
popen_mock.assert_called_with("basic_command", env=ANY)
105+
popen_mock.reset_mock()
106+
107+
import hydra
108+
109+
# when hydra available but not initialized
110+
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", True)
111+
HydraConfigMock = Mock()
112+
HydraConfigMock.initialized.return_value = False
113+
monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock)
114+
simulate_launch()
115+
popen_mock.assert_called_with("basic_command", env=ANY)
116+
popen_mock.reset_mock()
117+
118+
# when hydra available and initialized
119+
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", True)
120+
HydraConfigMock = Mock()
121+
HydraConfigMock.initialized.return_value = True
122+
monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock)
123+
simulate_launch()
124+
popen_mock.assert_called_with("hydra_command", env=ANY)
125+
popen_mock.reset_mock()

0 commit comments

Comments
 (0)