Skip to content
85 changes: 50 additions & 35 deletions src/lightning_lite/strategies/launchers/subprocess_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import subprocess
import sys
from time import sleep
from typing import Any, Callable, Optional
from typing import Any, Callable, Sequence

import __main__
import numpy as np
Expand All @@ -24,7 +24,7 @@
from lightning_lite.plugins.environments.cluster_environment import ClusterEnvironment
from lightning_lite.strategies.launchers.base import _Launcher

_HYDRA_AVAILABLE = RequirementCache("hydra")
_HYDRA_AVAILABLE = RequirementCache("hydra-core")


class _SubprocessScriptLauncher(_Launcher):
Expand Down Expand Up @@ -104,32 +104,6 @@ def _call_children_scripts(self) -> None:
# allow the user to pass the node rank
os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank())
os.environ["LOCAL_RANK"] = str(self.cluster_environment.local_rank())

# Check if the current calling command looked like `python a/b/c.py` or `python -m a.b.c`
# See https://docs.python.org/3/reference/import.html#main-spec
if __main__.__spec__ is None: # pragma: no-cover
# Script called as `python a/b/c.py`
if _HYDRA_AVAILABLE:
# when user is using hydra find the absolute path
from hydra.utils import to_absolute_path

to_abs_path = to_absolute_path
else:
to_abs_path = os.path.abspath

# pull out the commands used to run the script and resolve the absolute file path
command = sys.argv
try:
full_path = to_abs_path(command[0])
except Exception:
full_path = os.path.abspath(command[0])

command[0] = full_path
# use the same python interpreter and actually running
command = [sys.executable] + command
else: # Script called as `python -m a.b.c`
command = [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]

os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}"

for local_rank in range(1, self.num_processes):
Expand All @@ -142,16 +116,16 @@ def _call_children_scripts(self) -> None:

# start process
# if hydra is available and initialized, make sure to set the cwd correctly
cwd: Optional[str] = None
hydra_in_use = False
if _HYDRA_AVAILABLE:
from hydra.core.hydra_config import HydraConfig
from hydra.utils import get_original_cwd

if HydraConfig.initialized():
cwd = get_original_cwd()
os_cwd = f'"{os.getcwd()}"'
command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"]
subprocess.Popen(command, env=env_copy, cwd=cwd)
hydra_in_use = HydraConfig.initialized()
if hydra_in_use:
command = _hydra_subprocess_cmd(local_rank=local_rank)
else:
command = _basic_subprocess_cmd()
subprocess.Popen(command, env=env_copy)

# starting all processes at once can cause issues
# with dataloaders delay between 1-10 seconds
Expand All @@ -165,3 +139,44 @@ def _check_can_spawn_children(self) -> None:
" Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user,"
" 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented."
)


def _basic_subprocess_cmd() -> Sequence[str]:
if __main__.__spec__ is None: # pragma: no-cover
return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:]
else:
return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]


def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]:
from hydra.core.hydra_config import HydraConfig
from hydra.utils import to_absolute_path

# when user is using hydra find the absolute path
if __main__.__spec__ is None: # pragma: no-cover
command = [sys.executable, to_absolute_path(sys.argv[0])]
else:
command = [sys.executable, "-m", __main__.__spec__.name]

# extract the hydra configuration
hydra_cfg = HydraConfig.get()

# the location of the hydra configuration files saved for the current job
hydra_output = hydra_cfg.runtime.output_dir
if hydra_cfg.output_subdir is not None:
hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir)

# check if experimental re-run capability exists
# otherwise use existing config.yaml which may have issues
pickled_config = os.path.join(hydra_output, "config.pickle")
if os.path.exists(pickled_config):
command += ["--experimental-rerun", pickled_config]

else:
command += ["-cp", hydra_output, "-cn", "config.yaml"]
command += [
f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}",
f"hydra.run.dir={hydra_cfg.runtime.output_dir}",
]

return command
56 changes: 7 additions & 49 deletions src/pytorch_lightning/strategies/launchers/subprocess_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@
# limitations under the License.
import os
import subprocess
import sys
from time import sleep
from typing import Any, Callable, Optional, Sequence
from typing import Any, Callable, Optional

import __main__
import numpy as np
from lightning_utilities.core.imports import RequirementCache

import pytorch_lightning as pl
from lightning_lite.plugins import ClusterEnvironment
from lightning_lite.strategies.launchers.base import _Launcher
from lightning_lite.strategies.launchers.subprocess_script import _basic_subprocess_cmd, _hydra_subprocess_cmd

_HYDRA_AVAILABLE = RequirementCache("hydra-core")

Expand Down Expand Up @@ -66,16 +65,16 @@ class _SubprocessScriptLauncher(_Launcher):
num_nodes: The total number of nodes that participate in this process group.
"""

@property
def is_interactive_compatible(self) -> bool:
return False

def __init__(self, cluster_environment: ClusterEnvironment, num_processes: int, num_nodes: int) -> None:
super().__init__()
self.cluster_environment = cluster_environment
self.num_processes = num_processes
self.num_nodes = num_nodes

@property
def is_interactive_compatible(self) -> bool:
return False

def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any:
"""Creates new processes, then calls the given function.

Expand Down Expand Up @@ -120,7 +119,7 @@ def _call_children_scripts(self) -> None:
if hydra_in_use:
command = _hydra_subprocess_cmd(local_rank)
else:
command = _basic_subprocess_cmd(local_rank)
command = _basic_subprocess_cmd()

subprocess.Popen(command, env=env_copy)

Expand All @@ -136,44 +135,3 @@ def _check_can_spawn_children(self) -> None:
" Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user,"
" 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented."
)


def _basic_subprocess_cmd(local_rank: int) -> Sequence[str]:
if __main__.__spec__ is None: # pragma: no-cover
return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:]
else:
return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:]


def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]:
from hydra.core.hydra_config import HydraConfig
from hydra.utils import to_absolute_path

# when user is using hydra find the absolute path
if __main__.__spec__ is None: # pragma: no-cover
command = [sys.executable, to_absolute_path(sys.argv[0])]
else:
command = [sys.executable, "-m", __main__.__spec__.name]

# extract the hydra configu
hydra_cfg = HydraConfig.get()

# the location of the hydra configuration files saved for the current job
hydra_output = hydra_cfg.runtime.output_dir
if hydra_cfg.output_subdir is not None:
hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir)

# check if experimental re-run capability exists
# otherwise use existing config.yaml which may have issues
pickled_config = os.path.join(hydra_output, "config.pickle")
if os.path.exists(pickled_config):
command += ["--experimental-rerun", pickled_config]

else:
command += ["-cp", hydra_output, "-cn", "config.yaml"]
command += [
f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}",
f"hydra.run.dir={hydra_cfg.runtime.output_dir}",
]

return command
51 changes: 49 additions & 2 deletions tests/tests_lite/strategies/launchers/test_subprocess_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
# limitations under the License.
import os
from unittest import mock
from unittest.mock import Mock
from unittest.mock import ANY, Mock

import pytest

from lightning_lite.strategies.launchers.subprocess_script import _SubprocessScriptLauncher
import lightning_lite
from lightning_lite.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE, _SubprocessScriptLauncher


def test_subprocess_script_launcher_interactive_compatible():
Expand Down Expand Up @@ -76,3 +77,49 @@ def test_subprocess_script_launcher_launch_processes(popen_mock, _):
# the current process
assert int(os.environ["WORLD_SIZE"]) == launcher.num_processes * launcher.num_nodes
assert int(os.environ["LOCAL_RANK"]) == 0


@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="hydra-core is required")
@mock.patch("lightning_lite.strategies.launchers.subprocess_script.sleep")
@mock.patch("lightning_lite.strategies.launchers.subprocess_script.subprocess.Popen")
def test_subprocess_script_launcher_hydra_in_use(popen_mock, _, monkeypatch):
basic_command = Mock(return_value="basic_command")
hydra_command = Mock(return_value="hydra_command")
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_basic_subprocess_cmd", basic_command)
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_hydra_subprocess_cmd", hydra_command)

def simulate_launch():
cluster_env = Mock()
cluster_env.creates_processes_externally = False
cluster_env.local_rank.return_value = 0
cluster_env.main_address = "address"
cluster_env.main_port = 1234
function = Mock()
launcher = _SubprocessScriptLauncher(cluster_env, num_processes=4, num_nodes=2)
launcher.launch(function)

# when hydra not available
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", False)
simulate_launch()
popen_mock.assert_called_with("basic_command", env=ANY)
popen_mock.reset_mock()

import hydra

# when hydra available but not initialized
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", True)
HydraConfigMock = Mock()
HydraConfigMock.initialized.return_value = False
monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock)
simulate_launch()
popen_mock.assert_called_with("basic_command", env=ANY)
popen_mock.reset_mock()

# when hydra available and initialized
monkeypatch.setattr(lightning_lite.strategies.launchers.subprocess_script, "_HYDRA_AVAILABLE", True)
HydraConfigMock = Mock()
HydraConfigMock.initialized.return_value = True
monkeypatch.setattr(hydra.core.hydra_config, "HydraConfig", HydraConfigMock)
simulate_launch()
popen_mock.assert_called_with("hydra_command", env=ANY)
popen_mock.reset_mock()