From 64e66a600e2d467d05b286a949b5c835c851bb7d Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Mon, 24 Jan 2022 23:06:17 -0500 Subject: [PATCH 01/34] adds hydra test --- tests/strategies/test_ddp_hydra_support.py | 126 +++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 tests/strategies/test_ddp_hydra_support.py diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py new file mode 100644 index 0000000000000..5a4af172d983b --- /dev/null +++ b/tests/strategies/test_ddp_hydra_support.py @@ -0,0 +1,126 @@ +import logging +import os +import subprocess +import sys +import tempfile +from pathlib import Path + +import pytest + +from pytorch_lightning.utilities.imports import _HYDRA_AVAILABLE +from tests.helpers.runif import RunIf + + +# fixture to run in a clean temporary directory +@pytest.fixture() +def cleandir(): + """Run function in a temporary directory.""" + with tempfile.TemporaryDirectory() as tmpdirname: + old_dir = os.getcwd() # get current working directory (cwd) + os.chdir(tmpdirname) # change cwd to the temp-directory + yield tmpdirname # yields control to the test to be run + os.chdir(old_dir) + logging.shutdown() + + +# function to run a command line argument +def run_process(cmd): + try: + process = subprocess.Popen( + args=cmd, + shell=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = process.communicate() + if process.returncode != 0: + sys.stderr.write(f"Subprocess error:\n{stderr}\n") + raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd) + return stdout, stderr + except Exception as e: + cmd = " ".join(cmd) + sys.stderr.write(f"Error executing:\n{cmd}\n") + raise e + + +# Script to run from command line +script = """ +import os + +import hydra +import torch +from torch import distributed as dist + +from pytorch_lightning import Trainer +from pytorch_lightning.strategies import DDPStrategy +from tests.helpers.boring_model import BoringModel + + +class BoringModelGPU(BoringModel): + def on_train_start(self) -> None: + # make sure that the model is on GPU when training + assert self.device == torch.device(f"cuda:{self.trainer.strategy.local_rank}") + self.start_cuda_memory = torch.cuda.memory_allocated() + + +@hydra.main() +def task_fn(cfg): + trainer = Trainer(gpus=cfg.gpus, strategy=cfg.strategy, fast_dev_run=True) + model = BoringModelGPU() + trainer.fit(model) + + # Need to do this in addition to Lightning shutting down the + # distributed processes in order to run a multirun loop with hydra + if dist.is_initialized(): + dist.destroy_process_group() + + envs = ( + "LOCAL_RANK", + "NODE_RANK", + "WORLD_SIZE", + "MASTER_ADDR", + "MASTER_PORT", + "PL_GLOBAL_SEED", + "PL_SEED_WORKERS", + ) + + for name in envs: + os.environ.pop(name, None) + + +if __name__ == "__main__": + task_fn() +""" + + +@RunIf(skip_windows=True, min_gpus=2) +@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") +@pytest.mark.usefixtures("cleandir") +@pytest.mark.parametrize("gpus", [1, 2]) +def test_ddp_with_hydra_runjob(gpus): + with open("temp.py", "w") as fn: + fn.write(script) + + run_process([sys.executable, "temp.py", f"+gpus={gpus}", '+strategy="ddp"']) + logs = sorted(Path.cwd().glob("**/config.yaml")) + assert len(logs) == 1 + + +@RunIf(skip_windows=True, min_gpus=2) +@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") +@pytest.mark.usefixtures("cleandir") +@pytest.mark.parametrize("gpus", [1, 2]) +@pytest.mark.parametrize("num_jobs", [1, 2]) +def test_ddp_with_hydra_multirunjob(gpus, num_jobs): + with open("temp.py", "w") as fn: + fn.write(script) + + fake_param = "+foo=" + for i in range(num_jobs): + fake_param += f"{i}" + if i < num_jobs - 1: + fake_param += "," + run_process([sys.executable, "temp.py", f"+gpus={gpus}", '+strategy="ddp"', fake_param, "--multirun"]) + + jobs = sorted(Path.cwd().glob("**/config.yaml")) + assert len(jobs) == num_jobs From ff7d7397e49a1117014d0c5fed7e153f78c29265 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Tue, 25 Jan 2022 15:47:44 -0500 Subject: [PATCH 02/34] update tests --- tests/strategies/test_ddp_hydra_support.py | 39 +++++++++++++++++++--- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index 5a4af172d983b..a84499274647d 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -10,6 +10,9 @@ from pytorch_lightning.utilities.imports import _HYDRA_AVAILABLE from tests.helpers.runif import RunIf +if _HYDRA_AVAILABLE: + from omegaconf import OmegaConf + # fixture to run in a clean temporary directory @pytest.fixture() @@ -97,14 +100,34 @@ def task_fn(cfg): @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("gpus", [1, 2]) -def test_ddp_with_hydra_runjob(gpus): +@pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) +def test_ddp_with_hydra_runjob(gpus, subdir): + # Save script locally with open("temp.py", "w") as fn: fn.write(script) - run_process([sys.executable, "temp.py", f"+gpus={gpus}", '+strategy="ddp"']) + # Run CLI + cmd = [sys.executable, "temp.py", f"+gpus={gpus}", '+strategy="ddp"'] + if subdir is not None: + cmd += [f"hydra.output_subdir={subdir}"] + run_process(cmd) + + # Make sure config.yaml was created logs = sorted(Path.cwd().glob("**/config.yaml")) assert len(logs) == 1 + # Make sure subdir was set + actual_subdir = ".hydra" if subdir is None else subdir + assert logs[0].parent.name == actual_subdir + + # Make sure the parameter was set and used + cfg = OmegaConf.load(logs[0]) + assert cfg.gpus == gpus + + # Make sure PL spawned a job that is logged by Hydra + logs = sorted(Path.cwd().glob("**/train_ddp_process_1.log")) + assert len(logs) == gpus - 1 + @RunIf(skip_windows=True, min_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @@ -122,5 +145,13 @@ def test_ddp_with_hydra_multirunjob(gpus, num_jobs): fake_param += "," run_process([sys.executable, "temp.py", f"+gpus={gpus}", '+strategy="ddp"', fake_param, "--multirun"]) - jobs = sorted(Path.cwd().glob("**/config.yaml")) - assert len(jobs) == num_jobs + configs = sorted(Path.cwd().glob("**/config.yaml")) + assert len(configs) == num_jobs + + for i, config in enumerate(configs): + cfg = OmegaConf.load(config) + assert cfg.gpus == gpus + assert cfg.foo == i + + logs = sorted(Path.cwd().glob("**/train_ddp_process_1.log")) + assert len(logs) == num_jobs * (gpus - 1) From 77a08761069a0e42e0f645a92e858cc119fdb81e Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Thu, 27 Jan 2022 08:45:46 -0500 Subject: [PATCH 03/34] create method for hydra --- pytorch_lightning/strategies/ddp.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/strategies/ddp.py b/pytorch_lightning/strategies/ddp.py index b5d83478101f1..9f0666c2c3599 100644 --- a/pytorch_lightning/strategies/ddp.py +++ b/pytorch_lightning/strategies/ddp.py @@ -18,7 +18,8 @@ import tempfile import time from pathlib import Path -from typing import Any, Dict, List, Optional, Union +from time import sleep +from typing import Any, Dict, List, Optional, Tuple, Union import torch import torch.distributed From 9d02c4c1f16a3cf8260c05e3ca29dbb9e5ff7885 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Thu, 27 Jan 2022 08:46:26 -0500 Subject: [PATCH 04/34] minor test updates --- tests/strategies/test_ddp_hydra_support.py | 31 +++++++++++++--------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index a84499274647d..2bbfcff6da044 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -2,7 +2,6 @@ import os import subprocess import sys -import tempfile from pathlib import Path import pytest @@ -14,16 +13,16 @@ from omegaconf import OmegaConf -# fixture to run in a clean temporary directory -@pytest.fixture() -def cleandir(): +# fixture to run hydra jobs in a clean temporary directory +# Hydra creates its own output directories and logs +@pytest.fixture +def cleandir(tmp_path): """Run function in a temporary directory.""" - with tempfile.TemporaryDirectory() as tmpdirname: - old_dir = os.getcwd() # get current working directory (cwd) - os.chdir(tmpdirname) # change cwd to the temp-directory - yield tmpdirname # yields control to the test to be run - os.chdir(old_dir) - logging.shutdown() + old_dir = os.getcwd() # get current working directory (cwd) + os.chdir(tmp_path) # change cwd to the temp-directory + yield tmp_path # yields control to the test to be run + os.chdir(old_dir) + logging.shutdown() # function to run a command line argument @@ -113,7 +112,7 @@ def test_ddp_with_hydra_runjob(gpus, subdir): run_process(cmd) # Make sure config.yaml was created - logs = sorted(Path.cwd().glob("**/config.yaml")) + logs = list(Path.cwd().glob("**/config.yaml")) assert len(logs) == 1 # Make sure subdir was set @@ -125,7 +124,7 @@ def test_ddp_with_hydra_runjob(gpus, subdir): assert cfg.gpus == gpus # Make sure PL spawned a job that is logged by Hydra - logs = sorted(Path.cwd().glob("**/train_ddp_process_1.log")) + logs = list(Path.cwd().glob("**/train_ddp_process_1.log")) assert len(logs) == gpus - 1 @@ -135,23 +134,29 @@ def test_ddp_with_hydra_runjob(gpus, subdir): @pytest.mark.parametrize("gpus", [1, 2]) @pytest.mark.parametrize("num_jobs", [1, 2]) def test_ddp_with_hydra_multirunjob(gpus, num_jobs): + # Save script locally with open("temp.py", "w") as fn: fn.write(script) + # create fake multirun params based on `num_jobs` fake_param = "+foo=" for i in range(num_jobs): fake_param += f"{i}" if i < num_jobs - 1: fake_param += "," + + # Run CLI run_process([sys.executable, "temp.py", f"+gpus={gpus}", '+strategy="ddp"', fake_param, "--multirun"]) + # Make sure config.yaml was created for each job configs = sorted(Path.cwd().glob("**/config.yaml")) assert len(configs) == num_jobs + # Make sure the parameter was set and used for each job for i, config in enumerate(configs): cfg = OmegaConf.load(config) assert cfg.gpus == gpus assert cfg.foo == i - logs = sorted(Path.cwd().glob("**/train_ddp_process_1.log")) + logs = list(Path.cwd().glob("**/train_ddp_process_1.log")) assert len(logs) == num_jobs * (gpus - 1) From 6361369d13fc9b4d48cb434e1c7164901b2a4a58 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Fri, 28 Jan 2022 15:24:18 -0500 Subject: [PATCH 05/34] adds teardown code to support hydra multirun --- pytorch_lightning/strategies/ddp.py | 25 +++++++ tests/strategies/test_ddp_hydra_support.py | 87 ++++++++++++++++------ 2 files changed, 90 insertions(+), 22 deletions(-) diff --git a/pytorch_lightning/strategies/ddp.py b/pytorch_lightning/strategies/ddp.py index 9f0666c2c3599..5add2d2a4987f 100644 --- a/pytorch_lightning/strategies/ddp.py +++ b/pytorch_lightning/strategies/ddp.py @@ -23,6 +23,7 @@ import torch import torch.distributed +from omegaconf import OmegaConf from torch.nn import Module from torch.nn.parallel.distributed import DistributedDataParallel @@ -432,3 +433,27 @@ def teardown(self) -> None: self.lightning_module.cpu() # clean up memory torch.cuda.empty_cache() + + if _HYDRA_AVAILABLE: + if HydraConfig.initialized(): + hydra_cfg = HydraConfig.get() + + # check if we are in multirun mode + if not OmegaConf.is_missing(hydra_cfg.job, "num"): + # shutdown any distributed process groups + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + # Remove PL environments so next multirun starts fresh + envs = ( + "LOCAL_RANK", + "NODE_RANK", + "WORLD_SIZE", + "MASTER_ADDR", + "MASTER_PORT", + "PL_GLOBAL_SEED", + "PL_SEED_WORKERS", + ) + + for name in envs: + os.environ.pop(name, None) diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index 2bbfcff6da044..562d650fbf9c5 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -5,8 +5,16 @@ from pathlib import Path import pytest +import torch.distributed +from hydra._internal.callbacks import Callbacks +from hydra._internal.hydra import Hydra +from hydra._internal.utils import create_config_search_path +from hydra.types import HydraContext, RunMode +from hydra.utils import instantiate +from pytorch_lightning import Trainer from pytorch_lightning.utilities.imports import _HYDRA_AVAILABLE +from tests.helpers.boring_model import BoringModel from tests.helpers.runif import RunIf if _HYDRA_AVAILABLE: @@ -37,6 +45,7 @@ def run_process(cmd): stdout, stderr = process.communicate() if process.returncode != 0: sys.stderr.write(f"Subprocess error:\n{stderr}\n") + sys.stderr.write(f"Subprocess stdout:\n{stdout}\n") raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd) return stdout, stderr except Exception as e: @@ -47,14 +56,10 @@ def run_process(cmd): # Script to run from command line script = """ -import os - import hydra import torch -from torch import distributed as dist from pytorch_lightning import Trainer -from pytorch_lightning.strategies import DDPStrategy from tests.helpers.boring_model import BoringModel @@ -71,24 +76,6 @@ def task_fn(cfg): model = BoringModelGPU() trainer.fit(model) - # Need to do this in addition to Lightning shutting down the - # distributed processes in order to run a multirun loop with hydra - if dist.is_initialized(): - dist.destroy_process_group() - - envs = ( - "LOCAL_RANK", - "NODE_RANK", - "WORLD_SIZE", - "MASTER_ADDR", - "MASTER_PORT", - "PL_GLOBAL_SEED", - "PL_SEED_WORKERS", - ) - - for name in envs: - os.environ.pop(name, None) - if __name__ == "__main__": task_fn() @@ -160,3 +147,59 @@ def test_ddp_with_hydra_multirunjob(gpus, num_jobs): logs = list(Path.cwd().glob("**/train_ddp_process_1.log")) assert len(logs) == num_jobs * (gpus - 1) + + +def task_fn(cfg): + trainer = Trainer(gpus=1, strategy="ddp", fast_dev_run=True) + model = BoringModel() + trainer.fit(model) + + +def run_hydra_sweeper(): + """Runs Hydra sweeper as a function (rather than CLI) so we can test teardown""" + search_path = create_config_search_path(None) + hydra = Hydra.create_main_hydra2(task_name="pytest", config_search_path=search_path) + + cfg = hydra.compose_config( + config_name=None, + overrides=[], + with_log_configuration=False, + run_mode=RunMode.MULTIRUN, + ) + + callbacks = Callbacks(cfg) + # Instantiate sweeper without using Hydra's Plugin discovery (Zen!) + sweeper = instantiate(cfg.hydra.sweeper) + sweeper.setup( + config=cfg, + hydra_context=HydraContext(config_loader=hydra.config_loader, callbacks=callbacks), + task_function=task_fn, + ) + + return sweeper.sweep(arguments=[]) + + +@RunIf(skip_windows=True, min_gpus=2) +@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") +@pytest.mark.usefixtures("cleandir") +def test_ddp_teardown_with_hydra(): + job = run_hydra_sweeper() + assert len(job) == 1 + + # Make sure DDPPlugin.teardown executed + # - torch.distributed should be shutdown + # - PL environment variables are removed + assert not torch.distributed.is_initialized() + + envs = ( + "LOCAL_RANK", + "NODE_RANK", + "WORLD_SIZE", + "MASTER_ADDR", + "MASTER_PORT", + "PL_GLOBAL_SEED", + "PL_SEED_WORKERS", + ) + + for name in envs: + assert name not in os.environ From dea8ea58021da0ee87cef524aa676d1d16c25595 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 28 Jan 2022 20:25:39 +0000 Subject: [PATCH 06/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/strategies/test_ddp_hydra_support.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index 562d650fbf9c5..ba005975461ac 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -156,7 +156,7 @@ def task_fn(cfg): def run_hydra_sweeper(): - """Runs Hydra sweeper as a function (rather than CLI) so we can test teardown""" + """Runs Hydra sweeper as a function (rather than CLI) so we can test teardown.""" search_path = create_config_search_path(None) hydra = Hydra.create_main_hydra2(task_name="pytest", config_search_path=search_path) From 20499b029f7755fd0aee7929a0c1ef19957957a7 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Fri, 28 Jan 2022 15:34:57 -0500 Subject: [PATCH 07/34] fixes hydra available import checks --- pytorch_lightning/strategies/ddp.py | 1 - tests/strategies/test_ddp_hydra_support.py | 10 +++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pytorch_lightning/strategies/ddp.py b/pytorch_lightning/strategies/ddp.py index 5add2d2a4987f..5761fabcf952e 100644 --- a/pytorch_lightning/strategies/ddp.py +++ b/pytorch_lightning/strategies/ddp.py @@ -23,7 +23,6 @@ import torch import torch.distributed -from omegaconf import OmegaConf from torch.nn import Module from torch.nn.parallel.distributed import DistributedDataParallel diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index ba005975461ac..ea39d0187ea74 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -6,11 +6,6 @@ import pytest import torch.distributed -from hydra._internal.callbacks import Callbacks -from hydra._internal.hydra import Hydra -from hydra._internal.utils import create_config_search_path -from hydra.types import HydraContext, RunMode -from hydra.utils import instantiate from pytorch_lightning import Trainer from pytorch_lightning.utilities.imports import _HYDRA_AVAILABLE @@ -18,6 +13,11 @@ from tests.helpers.runif import RunIf if _HYDRA_AVAILABLE: + from hydra._internal.callbacks import Callbacks + from hydra._internal.hydra import Hydra + from hydra._internal.utils import create_config_search_path + from hydra.types import HydraContext, RunMode + from hydra.utils import instantiate from omegaconf import OmegaConf From bb93d2766de1d7b981b737ed92d9babfb5003a26 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Mon, 31 Jan 2022 13:54:50 -0500 Subject: [PATCH 08/34] moved hydra specific code to utilities --- pytorch_lightning/strategies/ddp.py | 23 +-------- pytorch_lightning/utilities/hydra.py | 73 ++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 22 deletions(-) create mode 100644 pytorch_lightning/utilities/hydra.py diff --git a/pytorch_lightning/strategies/ddp.py b/pytorch_lightning/strategies/ddp.py index 5761fabcf952e..a4820c15d25c1 100644 --- a/pytorch_lightning/strategies/ddp.py +++ b/pytorch_lightning/strategies/ddp.py @@ -434,25 +434,4 @@ def teardown(self) -> None: torch.cuda.empty_cache() if _HYDRA_AVAILABLE: - if HydraConfig.initialized(): - hydra_cfg = HydraConfig.get() - - # check if we are in multirun mode - if not OmegaConf.is_missing(hydra_cfg.job, "num"): - # shutdown any distributed process groups - if torch.distributed.is_initialized(): - torch.distributed.destroy_process_group() - - # Remove PL environments so next multirun starts fresh - envs = ( - "LOCAL_RANK", - "NODE_RANK", - "WORLD_SIZE", - "MASTER_ADDR", - "MASTER_PORT", - "PL_GLOBAL_SEED", - "PL_SEED_WORKERS", - ) - - for name in envs: - os.environ.pop(name, None) + teardown_ddp_for_hydra_multirun() diff --git a/pytorch_lightning/utilities/hydra.py b/pytorch_lightning/utilities/hydra.py new file mode 100644 index 0000000000000..045cb3594cc90 --- /dev/null +++ b/pytorch_lightning/utilities/hydra.py @@ -0,0 +1,73 @@ +import os +from typing import List, Optional, Tuple + +import __main__ +import torch + +from pytorch_lightning.utilities import _HYDRA_AVAILABLE + +if _HYDRA_AVAILABLE: + from hydra.core.hydra_config import HydraConfig + from hydra.utils import get_original_cwd + from omegaconf import OmegaConf + + +def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: str) -> Tuple[str, str]: + """Modifies the DDP spawn command to support Hydra initiated processes. + + If Hydra is initialized: + 1) Set `cwd` to the hydra working directory + 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child + + """ + + cwd: Optional[str] = None + if HydraConfig.initialized(): + orig_cwd = get_original_cwd() + cwd = os.getcwd() + os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name + + hydra_cfg = HydraConfig.get() + hydra_output = os.path.join(os.path.relpath(cwd, orig_cwd), hydra_cfg.output_subdir) + + if __main__.__spec__ is None: # pragma: no-cover + command_no_args = command[:2] + else: + # this fails for `python -m pdb -m a.b.c ` + command_no_args = command[:3] + + command = command_no_args + command += ["-cp", hydra_output, "-cn", "config.yaml"] + command += [ + f"hydra.output_subdir={hydra_cfg.output_subdir}", + f"hydra.run.dir={os_cwd}", + f"hydra.job.name=train_ddp_process_{local_rank}", + ] + return command, cwd + + +def teardown_ddp_for_hydra_multirun(): + """Performs additional teardown steps for PL to allow for Hydra multirun jobs.""" + + if HydraConfig.initialized(): + hydra_cfg = HydraConfig.get() + + # check if we are in multirun mode + if not OmegaConf.is_missing(hydra_cfg.job, "num"): + # shutdown any distributed process groups + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + # Remove PL environments so next multirun starts fresh + envs = ( + "LOCAL_RANK", + "NODE_RANK", + "WORLD_SIZE", + "MASTER_ADDR", + "MASTER_PORT", + "PL_GLOBAL_SEED", + "PL_SEED_WORKERS", + ) + + for name in envs: + os.environ.pop(name, None) From fee8d6131aab48742d81d3b115f9599bb213fc0e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 31 Jan 2022 18:58:43 +0000 Subject: [PATCH 09/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pytorch_lightning/utilities/hydra.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pytorch_lightning/utilities/hydra.py b/pytorch_lightning/utilities/hydra.py index 045cb3594cc90..be3296091b526 100644 --- a/pytorch_lightning/utilities/hydra.py +++ b/pytorch_lightning/utilities/hydra.py @@ -15,10 +15,8 @@ def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: str) -> Tuple[str, str]: """Modifies the DDP spawn command to support Hydra initiated processes. - If Hydra is initialized: - 1) Set `cwd` to the hydra working directory - 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child - + If Hydra is initialized: 1) Set `cwd` to the hydra working directory 2) Use the stored configuration in + `hydra_cfg.output_subdir / config.yaml` to spawn a new child """ cwd: Optional[str] = None From 2c1e839b435220a4d908f420b63e7ff169c08ed0 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Mon, 31 Jan 2022 15:47:13 -0500 Subject: [PATCH 10/34] fix mypy and flake8 errors --- pytorch_lightning/strategies/ddp.py | 2 +- pytorch_lightning/utilities/hydra.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/strategies/ddp.py b/pytorch_lightning/strategies/ddp.py index a4820c15d25c1..827839d9f8667 100644 --- a/pytorch_lightning/strategies/ddp.py +++ b/pytorch_lightning/strategies/ddp.py @@ -19,7 +19,7 @@ import time from pathlib import Path from time import sleep -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Union import torch import torch.distributed diff --git a/pytorch_lightning/utilities/hydra.py b/pytorch_lightning/utilities/hydra.py index be3296091b526..14bf429109489 100644 --- a/pytorch_lightning/utilities/hydra.py +++ b/pytorch_lightning/utilities/hydra.py @@ -44,7 +44,7 @@ def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: str) -> Tupl return command, cwd -def teardown_ddp_for_hydra_multirun(): +def teardown_ddp_for_hydra_multirun() -> None: """Performs additional teardown steps for PL to allow for Hydra multirun jobs.""" if HydraConfig.initialized(): From 4ec0a4528ad55fe1a894efa65b45464b4715b499 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Mon, 31 Jan 2022 15:47:26 -0500 Subject: [PATCH 11/34] fixes error with old version of hydra --- tests/strategies/test_ddp_hydra_support.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index ea39d0187ea74..007de53caa9ee 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -13,7 +13,11 @@ from tests.helpers.runif import RunIf if _HYDRA_AVAILABLE: - from hydra._internal.callbacks import Callbacks + try: + from hydra._internal.callbacks import Callbacks + except: + Callbacks = None + from hydra._internal.hydra import Hydra from hydra._internal.utils import create_config_search_path from hydra.types import HydraContext, RunMode @@ -181,6 +185,7 @@ def run_hydra_sweeper(): @RunIf(skip_windows=True, min_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") +@pytest.mark.skipif(Callbacks is None, reason="Hydra version too old (for test only)") @pytest.mark.usefixtures("cleandir") def test_ddp_teardown_with_hydra(): job = run_hydra_sweeper() From 0eb06273aad5fd36c3ba57912848ea0f15970287 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Wed, 9 Feb 2022 11:19:24 -0500 Subject: [PATCH 12/34] Update pytorch_lightning/utilities/hydra.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Carlos Mocholí --- pytorch_lightning/utilities/hydra.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/utilities/hydra.py b/pytorch_lightning/utilities/hydra.py index 14bf429109489..68c52d49796ea 100644 --- a/pytorch_lightning/utilities/hydra.py +++ b/pytorch_lightning/utilities/hydra.py @@ -12,7 +12,7 @@ from omegaconf import OmegaConf -def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: str) -> Tuple[str, str]: +def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: int) -> Tuple[str, str]: """Modifies the DDP spawn command to support Hydra initiated processes. If Hydra is initialized: 1) Set `cwd` to the hydra working directory 2) Use the stored configuration in From 6530c6a32bb088e388287c328d2f88a3b2726d63 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Wed, 9 Feb 2022 12:26:38 -0500 Subject: [PATCH 13/34] addresses comments and simplifies tests --- pytorch_lightning/utilities/hydra.py | 64 ++++++----- tests/strategies/test_ddp_hydra_support.py | 118 +++++---------------- 2 files changed, 60 insertions(+), 122 deletions(-) diff --git a/pytorch_lightning/utilities/hydra.py b/pytorch_lightning/utilities/hydra.py index 68c52d49796ea..7d44caf03877f 100644 --- a/pytorch_lightning/utilities/hydra.py +++ b/pytorch_lightning/utilities/hydra.py @@ -12,12 +12,12 @@ from omegaconf import OmegaConf -def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: int) -> Tuple[str, str]: - """Modifies the DDP spawn command to support Hydra initiated processes. +def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: + """Modifies the DDP spawn command to support Hydra initiated processes.""" - If Hydra is initialized: 1) Set `cwd` to the hydra working directory 2) Use the stored configuration in - `hydra_cfg.output_subdir / config.yaml` to spawn a new child - """ + # If Hydra is initialized: + # 1) Set `cwd` to the hydra working directory + # 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child cwd: Optional[str] = None if HydraConfig.initialized(): @@ -26,7 +26,7 @@ def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: int) -> Tupl os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name hydra_cfg = HydraConfig.get() - hydra_output = os.path.join(os.path.relpath(cwd, orig_cwd), hydra_cfg.output_subdir) + hydra_output = os.path.join(cwd, hydra_cfg.output_subdir) if __main__.__spec__ is None: # pragma: no-cover command_no_args = command[:2] @@ -35,9 +35,25 @@ def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: int) -> Tupl command_no_args = command[:3] command = command_no_args + + # run the Hydra job using the current job configuration + # - typically located in: + # RUN MODE: hydra.run.dir/.hydra/config.ayml + # MULTIRUN MODE: hydra.sweep.dir/hydra.sweep.subdir/.hydra/config.yaml command += ["-cp", hydra_output, "-cn", "config.yaml"] + + # hydra.output_subdir=.pl_ddp_hydra_{local_rank} + # Store process config in its own to avoid overwriting + # and allow the user to very that each spawned job uses + # the same configuration + # hydra.run.dir={os_cwd} + # This makes sure to run this job, log, and store any outputs + # in the current experiment directory + # + # hydra.job.name=train_ddp_process_{local_rank} + # This defines the logging output file for the process command += [ - f"hydra.output_subdir={hydra_cfg.output_subdir}", + f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}", ] @@ -48,24 +64,16 @@ def teardown_ddp_for_hydra_multirun() -> None: """Performs additional teardown steps for PL to allow for Hydra multirun jobs.""" if HydraConfig.initialized(): - hydra_cfg = HydraConfig.get() - - # check if we are in multirun mode - if not OmegaConf.is_missing(hydra_cfg.job, "num"): - # shutdown any distributed process groups - if torch.distributed.is_initialized(): - torch.distributed.destroy_process_group() - - # Remove PL environments so next multirun starts fresh - envs = ( - "LOCAL_RANK", - "NODE_RANK", - "WORLD_SIZE", - "MASTER_ADDR", - "MASTER_PORT", - "PL_GLOBAL_SEED", - "PL_SEED_WORKERS", - ) - - for name in envs: - os.environ.pop(name, None) + # shutdown any distributed process groups + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + envs = ( + "LOCAL_RANK", + "NODE_RANK", + "WORLD_SIZE", + "MASTER_ADDR", + "MASTER_PORT", + ) + for name in envs: + os.environ.pop(name, None) diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index 007de53caa9ee..87390c8ef82e1 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -5,23 +5,10 @@ from pathlib import Path import pytest -import torch.distributed -from pytorch_lightning import Trainer from pytorch_lightning.utilities.imports import _HYDRA_AVAILABLE -from tests.helpers.boring_model import BoringModel -from tests.helpers.runif import RunIf if _HYDRA_AVAILABLE: - try: - from hydra._internal.callbacks import Callbacks - except: - Callbacks = None - - from hydra._internal.hydra import Hydra - from hydra._internal.utils import create_config_search_path - from hydra.types import HydraContext, RunMode - from hydra.utils import instantiate from omegaconf import OmegaConf @@ -60,6 +47,7 @@ def run_process(cmd): # Script to run from command line script = """ +import os import hydra import torch @@ -76,55 +64,53 @@ def on_train_start(self) -> None: @hydra.main() def task_fn(cfg): - trainer = Trainer(gpus=cfg.gpus, strategy=cfg.strategy, fast_dev_run=True) + trainer = Trainer(accelerator="auto", devices=cfg.devices, strategy=cfg.strategy, fast_dev_run=True) model = BoringModelGPU() trainer.fit(model) + # make sure teardown executed + assert not torch.distributed.is_initialized(), f"Torch Distributed Initialized: {torch.distributed.is_initialized()}" + assert "LOCAL_RANK" not in os.environ if __name__ == "__main__": task_fn() """ -@RunIf(skip_windows=True, min_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") -@pytest.mark.parametrize("gpus", [1, 2]) +@pytest.mark.parametrize("devices", [1, 2]) @pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) -def test_ddp_with_hydra_runjob(gpus, subdir): +def test_ddp_with_hydra_runjob(devices, subdir): # Save script locally with open("temp.py", "w") as fn: fn.write(script) # Run CLI - cmd = [sys.executable, "temp.py", f"+gpus={gpus}", '+strategy="ddp"'] + cmd = [sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"'] if subdir is not None: cmd += [f"hydra.output_subdir={subdir}"] run_process(cmd) - # Make sure config.yaml was created + # Make sure config.yaml was created for additional + # processes. logs = list(Path.cwd().glob("**/config.yaml")) - assert len(logs) == 1 - - # Make sure subdir was set - actual_subdir = ".hydra" if subdir is None else subdir - assert logs[0].parent.name == actual_subdir + assert len(logs) == devices # Make sure the parameter was set and used cfg = OmegaConf.load(logs[0]) - assert cfg.gpus == gpus + assert cfg.devices == devices # Make sure PL spawned a job that is logged by Hydra - logs = list(Path.cwd().glob("**/train_ddp_process_1.log")) - assert len(logs) == gpus - 1 + logs = list(Path.cwd().glob("**/train_ddp_process_*.log")) + assert len(logs) == devices - 1 -@RunIf(skip_windows=True, min_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") -@pytest.mark.parametrize("gpus", [1, 2]) +@pytest.mark.parametrize("devices", [1, 2]) @pytest.mark.parametrize("num_jobs", [1, 2]) -def test_ddp_with_hydra_multirunjob(gpus, num_jobs): +def test_ddp_with_hydra_multirunjob(devices, num_jobs): # Save script locally with open("temp.py", "w") as fn: fn.write(script) @@ -137,74 +123,18 @@ def test_ddp_with_hydra_multirunjob(gpus, num_jobs): fake_param += "," # Run CLI - run_process([sys.executable, "temp.py", f"+gpus={gpus}", '+strategy="ddp"', fake_param, "--multirun"]) + run_process([sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"', fake_param, "--multirun"]) # Make sure config.yaml was created for each job - configs = sorted(Path.cwd().glob("**/config.yaml")) - assert len(configs) == num_jobs + configs = sorted(Path.cwd().glob("**/.pl_ddp_hydra_*/config.yaml")) + assert len(configs) == num_jobs * (devices - 1) # Make sure the parameter was set and used for each job for i, config in enumerate(configs): cfg = OmegaConf.load(config) - assert cfg.gpus == gpus - assert cfg.foo == i - - logs = list(Path.cwd().glob("**/train_ddp_process_1.log")) - assert len(logs) == num_jobs * (gpus - 1) - - -def task_fn(cfg): - trainer = Trainer(gpus=1, strategy="ddp", fast_dev_run=True) - model = BoringModel() - trainer.fit(model) - - -def run_hydra_sweeper(): - """Runs Hydra sweeper as a function (rather than CLI) so we can test teardown.""" - search_path = create_config_search_path(None) - hydra = Hydra.create_main_hydra2(task_name="pytest", config_search_path=search_path) - - cfg = hydra.compose_config( - config_name=None, - overrides=[], - with_log_configuration=False, - run_mode=RunMode.MULTIRUN, - ) - - callbacks = Callbacks(cfg) - # Instantiate sweeper without using Hydra's Plugin discovery (Zen!) - sweeper = instantiate(cfg.hydra.sweeper) - sweeper.setup( - config=cfg, - hydra_context=HydraContext(config_loader=hydra.config_loader, callbacks=callbacks), - task_function=task_fn, - ) + local_rank = int(config.parent.parent.parts[-1]) + assert cfg.devices == devices + assert cfg.foo == local_rank - return sweeper.sweep(arguments=[]) - - -@RunIf(skip_windows=True, min_gpus=2) -@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") -@pytest.mark.skipif(Callbacks is None, reason="Hydra version too old (for test only)") -@pytest.mark.usefixtures("cleandir") -def test_ddp_teardown_with_hydra(): - job = run_hydra_sweeper() - assert len(job) == 1 - - # Make sure DDPPlugin.teardown executed - # - torch.distributed should be shutdown - # - PL environment variables are removed - assert not torch.distributed.is_initialized() - - envs = ( - "LOCAL_RANK", - "NODE_RANK", - "WORLD_SIZE", - "MASTER_ADDR", - "MASTER_PORT", - "PL_GLOBAL_SEED", - "PL_SEED_WORKERS", - ) - - for name in envs: - assert name not in os.environ + logs = list(Path.cwd().glob("**/train_ddp_process_*.log")) + assert len(logs) == num_jobs * (devices - 1) From 7f340edb050a59d55fdec5019454b9332626387d Mon Sep 17 00:00:00 2001 From: rohitgr7 Date: Tue, 22 Feb 2022 09:29:57 -0500 Subject: [PATCH 14/34] rebase with launcher --- pytorch_lightning/strategies/ddp.py | 4 ---- pytorch_lightning/strategies/launchers/base.py | 3 +++ .../strategies/launchers/subprocess_script.py | 15 +++++++++------ pytorch_lightning/strategies/strategy.py | 3 +++ tests/strategies/test_ddp_hydra_support.py | 12 ++++-------- 5 files changed, 19 insertions(+), 18 deletions(-) diff --git a/pytorch_lightning/strategies/ddp.py b/pytorch_lightning/strategies/ddp.py index 827839d9f8667..b5d83478101f1 100644 --- a/pytorch_lightning/strategies/ddp.py +++ b/pytorch_lightning/strategies/ddp.py @@ -18,7 +18,6 @@ import tempfile import time from pathlib import Path -from time import sleep from typing import Any, Dict, List, Optional, Union import torch @@ -432,6 +431,3 @@ def teardown(self) -> None: self.lightning_module.cpu() # clean up memory torch.cuda.empty_cache() - - if _HYDRA_AVAILABLE: - teardown_ddp_for_hydra_multirun() diff --git a/pytorch_lightning/strategies/launchers/base.py b/pytorch_lightning/strategies/launchers/base.py index 2acf54afef245..4a30183ffd2b6 100644 --- a/pytorch_lightning/strategies/launchers/base.py +++ b/pytorch_lightning/strategies/launchers/base.py @@ -34,3 +34,6 @@ def is_interactive_compatible(self) -> bool: @abstractmethod def launch(self, function: Callable, *args: Any, **kwargs: Any) -> Any: """Launches the processes.""" + + def teardown(self) -> None: + """This method is called to teardown the launcher.""" diff --git a/pytorch_lightning/strategies/launchers/subprocess_script.py b/pytorch_lightning/strategies/launchers/subprocess_script.py index 5a8632fb87306..2b537ef9c56bb 100644 --- a/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -26,8 +26,9 @@ from pytorch_lightning.utilities import _HYDRA_AVAILABLE if _HYDRA_AVAILABLE: - from hydra.core.hydra_config import HydraConfig - from hydra.utils import get_original_cwd, to_absolute_path + from hydra.utils import to_absolute_path + + from pytorch_lightning.utilities.hydra import get_ddp_spawn_command_for_hydra, teardown_ddp_for_hydra_multirun class _SubprocessScriptLauncher(_Launcher): @@ -138,10 +139,8 @@ def _call_children_scripts(self) -> None: # if hydra is available and initialized, make sure to set the cwd correctly cwd: Optional[str] = None if _HYDRA_AVAILABLE: - if HydraConfig.initialized(): - cwd = get_original_cwd() - os_cwd = f'"{os.getcwd()}"' - command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"] + command, cwd = get_ddp_spawn_command_for_hydra(command, local_rank) + subprocess.Popen(command, env=env_copy, cwd=cwd) # starting all processes at once can cause issues @@ -156,3 +155,7 @@ def _check_can_spawn_children(self) -> None: " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." ) + + def teardown(self) -> None: + if _HYDRA_AVAILABLE: + teardown_ddp_for_hydra_multirun() diff --git a/pytorch_lightning/strategies/strategy.py b/pytorch_lightning/strategies/strategy.py index 3ff8f10c6536c..a475f036d6b90 100644 --- a/pytorch_lightning/strategies/strategy.py +++ b/pytorch_lightning/strategies/strategy.py @@ -442,6 +442,9 @@ def teardown(self) -> None: optimizers_to_device(self.optimizers, torch.device("cpu")) self.precision_plugin.teardown() + if self.launcher: + self.launcher.teardown() + @classmethod def register_strategies(cls, strategy_registry) -> None: pass diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index 87390c8ef82e1..054546cd62a64 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -7,6 +7,7 @@ import pytest from pytorch_lightning.utilities.imports import _HYDRA_AVAILABLE +from tests.helpers.runif import RunIf if _HYDRA_AVAILABLE: from omegaconf import OmegaConf @@ -50,33 +51,27 @@ def run_process(cmd): import os import hydra import torch - from pytorch_lightning import Trainer from tests.helpers.boring_model import BoringModel - - class BoringModelGPU(BoringModel): def on_train_start(self) -> None: # make sure that the model is on GPU when training assert self.device == torch.device(f"cuda:{self.trainer.strategy.local_rank}") self.start_cuda_memory = torch.cuda.memory_allocated() - - @hydra.main() def task_fn(cfg): trainer = Trainer(accelerator="auto", devices=cfg.devices, strategy=cfg.strategy, fast_dev_run=True) model = BoringModelGPU() trainer.fit(model) - # make sure teardown executed - assert not torch.distributed.is_initialized(), f"Torch Distributed Initialized: {torch.distributed.is_initialized()}" + assert not torch.distributed.is_initialized() assert "LOCAL_RANK" not in os.environ - if __name__ == "__main__": task_fn() """ +@RunIf(min_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("devices", [1, 2]) @@ -106,6 +101,7 @@ def test_ddp_with_hydra_runjob(devices, subdir): assert len(logs) == devices - 1 +@RunIf(min_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("devices", [1, 2]) From 12c11226ed54bf0a391150e575522636d9d89b93 Mon Sep 17 00:00:00 2001 From: rohitgr7 Date: Tue, 22 Feb 2022 09:56:06 -0500 Subject: [PATCH 15/34] add hydra launcher --- pytorch_lightning/strategies/ddp.py | 5 +- .../strategies/launchers/subprocess_script.py | 30 +++------ .../launchers/subprocess_script_hydra.py} | 63 ++++++++++--------- tests/strategies/test_ddp_hydra_support.py | 17 +++-- 4 files changed, 58 insertions(+), 57 deletions(-) rename pytorch_lightning/{utilities/hydra.py => strategies/launchers/subprocess_script_hydra.py} (54%) diff --git a/pytorch_lightning/strategies/ddp.py b/pytorch_lightning/strategies/ddp.py index b5d83478101f1..068e82ce67e66 100644 --- a/pytorch_lightning/strategies/ddp.py +++ b/pytorch_lightning/strategies/ddp.py @@ -33,10 +33,12 @@ from pytorch_lightning.plugins.io.checkpoint_plugin import CheckpointIO from pytorch_lightning.plugins.precision import PrecisionPlugin from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher +from pytorch_lightning.strategies.launchers.subprocess_script_hydra import _SubprocessScriptHydraLauncher from pytorch_lightning.strategies.parallel import ParallelStrategy from pytorch_lightning.trainer.states import TrainerFn from pytorch_lightning.utilities import ( _FAIRSCALE_AVAILABLE, + _HYDRA_AVAILABLE, _IS_WINDOWS, _TORCH_GREATER_EQUAL_1_8, _TORCH_GREATER_EQUAL_1_9, @@ -127,7 +129,8 @@ def _is_single_process_single_device(self) -> bool: return True def _configure_launcher(self) -> None: - self._launcher = _SubprocessScriptLauncher(self.cluster_environment, self.num_processes, self.num_nodes) + launcher_cls = _SubprocessScriptHydraLauncher if _HYDRA_AVAILABLE else _SubprocessScriptLauncher + self._launcher = launcher_cls(self.cluster_environment, self.num_processes, self.num_nodes) if not self.cluster_environment.creates_processes_externally: self._rank_0_will_call_children_scripts = True diff --git a/pytorch_lightning/strategies/launchers/subprocess_script.py b/pytorch_lightning/strategies/launchers/subprocess_script.py index 2b537ef9c56bb..d2816a4776c38 100644 --- a/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -15,7 +15,7 @@ import subprocess import sys from time import sleep -from typing import Any, Callable, Optional +from typing import Any, Callable, List, Optional, Tuple import __main__ import numpy as np @@ -23,12 +23,6 @@ import pytorch_lightning as pl from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment from pytorch_lightning.strategies.launchers.base import _Launcher -from pytorch_lightning.utilities import _HYDRA_AVAILABLE - -if _HYDRA_AVAILABLE: - from hydra.utils import to_absolute_path - - from pytorch_lightning.utilities.hydra import get_ddp_spawn_command_for_hydra, teardown_ddp_for_hydra_multirun class _SubprocessScriptLauncher(_Launcher): @@ -93,6 +87,12 @@ def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] self._call_children_scripts() return function(*args, **kwargs) + def _get_complete_path(self, command: str) -> str: + return os.path.abspath(command) + + def _get_launch_command(self, command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: + return command, None + def _call_children_scripts(self) -> None: # bookkeeping of spawned processes self._check_can_spawn_children() @@ -108,14 +108,10 @@ def _call_children_scripts(self) -> None: # Check if the current calling command looked like `python a/b/c.py` or `python -m a.b.c` # See https://docs.python.org/3/reference/import.html#main-spec if __main__.__spec__ is None: # pragma: no-cover - # Script called as `python a/b/c.py` - # when user is using hydra find the absolute path - path_lib = os.path.abspath if not _HYDRA_AVAILABLE else to_absolute_path - # pull out the commands used to run the script and resolve the abs file path command = sys.argv try: - full_path = path_lib(command[0]) + full_path = self._get_complete_path(command[0]) except Exception: full_path = os.path.abspath(command[0]) @@ -136,11 +132,7 @@ def _call_children_scripts(self) -> None: del env_copy["PL_GLOBAL_SEED"] # start process - # if hydra is available and initialized, make sure to set the cwd correctly - cwd: Optional[str] = None - if _HYDRA_AVAILABLE: - command, cwd = get_ddp_spawn_command_for_hydra(command, local_rank) - + command, cwd = self._get_launch_command(command, local_rank) subprocess.Popen(command, env=env_copy, cwd=cwd) # starting all processes at once can cause issues @@ -155,7 +147,3 @@ def _check_can_spawn_children(self) -> None: " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." ) - - def teardown(self) -> None: - if _HYDRA_AVAILABLE: - teardown_ddp_for_hydra_multirun() diff --git a/pytorch_lightning/utilities/hydra.py b/pytorch_lightning/strategies/launchers/subprocess_script_hydra.py similarity index 54% rename from pytorch_lightning/utilities/hydra.py rename to pytorch_lightning/strategies/launchers/subprocess_script_hydra.py index 7d44caf03877f..ba0ac5c0b6bd6 100644 --- a/pytorch_lightning/utilities/hydra.py +++ b/pytorch_lightning/strategies/launchers/subprocess_script_hydra.py @@ -4,24 +4,30 @@ import __main__ import torch +from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher from pytorch_lightning.utilities import _HYDRA_AVAILABLE if _HYDRA_AVAILABLE: from hydra.core.hydra_config import HydraConfig - from hydra.utils import get_original_cwd - from omegaconf import OmegaConf + from hydra.utils import to_absolute_path -def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: - """Modifies the DDP spawn command to support Hydra initiated processes.""" +class _SubprocessScriptHydraLauncher(_SubprocessScriptLauncher): + """Hydra Launcher subclasses `_SubprocessScriptLauncher to manage Hydra specific commands when running under + Hydra environment.""" - # If Hydra is initialized: - # 1) Set `cwd` to the hydra working directory - # 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child + def _get_complete_path(self, command: str) -> str: + return to_absolute_path(command) + + def _get_launch_command(self, command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: + """Modifies the command to support Hydra initiated processes.""" + if not HydraConfig.initialized(): + return command, None + + # If Hydra is initialized: + # 1) Set `cwd` to the hydra working directory + # 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child - cwd: Optional[str] = None - if HydraConfig.initialized(): - orig_cwd = get_original_cwd() cwd = os.getcwd() os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name @@ -57,23 +63,20 @@ def get_ddp_spawn_command_for_hydra(command: List[str], local_rank: int) -> Tupl f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}", ] - return command, cwd - - -def teardown_ddp_for_hydra_multirun() -> None: - """Performs additional teardown steps for PL to allow for Hydra multirun jobs.""" - - if HydraConfig.initialized(): - # shutdown any distributed process groups - if torch.distributed.is_initialized(): - torch.distributed.destroy_process_group() - - envs = ( - "LOCAL_RANK", - "NODE_RANK", - "WORLD_SIZE", - "MASTER_ADDR", - "MASTER_PORT", - ) - for name in envs: - os.environ.pop(name, None) + return command, cwd + + def teardown(self) -> None: + if HydraConfig.initialized(): + # shutdown any distributed process groups + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + envs = ( + "LOCAL_RANK", + "NODE_RANK", + "WORLD_SIZE", + "MASTER_ADDR", + "MASTER_PORT", + ) + for name in envs: + os.environ.pop(name, None) diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index 054546cd62a64..993c02801b191 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -48,16 +48,21 @@ def run_process(cmd): # Script to run from command line script = """ -import os import hydra +import os import torch + from pytorch_lightning import Trainer from tests.helpers.boring_model import BoringModel + + class BoringModelGPU(BoringModel): def on_train_start(self) -> None: # make sure that the model is on GPU when training assert self.device == torch.device(f"cuda:{self.trainer.strategy.local_rank}") self.start_cuda_memory = torch.cuda.memory_allocated() + + @hydra.main() def task_fn(cfg): trainer = Trainer(accelerator="auto", devices=cfg.devices, strategy=cfg.strategy, fast_dev_run=True) @@ -66,6 +71,8 @@ def task_fn(cfg): # make sure teardown executed assert not torch.distributed.is_initialized() assert "LOCAL_RANK" not in os.environ + + if __name__ == "__main__": task_fn() """ @@ -74,14 +81,14 @@ def task_fn(cfg): @RunIf(min_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") -@pytest.mark.parametrize("devices", [1, 2]) @pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) -def test_ddp_with_hydra_runjob(devices, subdir): +def test_ddp_with_hydra_runjob(subdir): # Save script locally with open("temp.py", "w") as fn: fn.write(script) # Run CLI + devices = 2 cmd = [sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"'] if subdir is not None: cmd += [f"hydra.output_subdir={subdir}"] @@ -104,15 +111,15 @@ def test_ddp_with_hydra_runjob(devices, subdir): @RunIf(min_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") -@pytest.mark.parametrize("devices", [1, 2]) @pytest.mark.parametrize("num_jobs", [1, 2]) -def test_ddp_with_hydra_multirunjob(devices, num_jobs): +def test_ddp_with_hydra_multirunjob(num_jobs): # Save script locally with open("temp.py", "w") as fn: fn.write(script) # create fake multirun params based on `num_jobs` fake_param = "+foo=" + devices = 2 for i in range(num_jobs): fake_param += f"{i}" if i < num_jobs - 1: From bdce0694fc7b696badf2144cb061317f854bb69a Mon Sep 17 00:00:00 2001 From: rohitgr7 Date: Mon, 28 Feb 2022 19:22:46 +0530 Subject: [PATCH 16/34] move launcher --- pytorch_lightning/strategies/ddp.py | 8 +- .../strategies/launchers/__init__.py | 6 +- .../strategies/launchers/base.py | 3 - .../strategies/launchers/subprocess_script.py | 90 +++++++++++++++++++ .../launchers/subprocess_script_hydra.py | 82 ----------------- pytorch_lightning/strategies/strategy.py | 3 - 6 files changed, 100 insertions(+), 92 deletions(-) delete mode 100644 pytorch_lightning/strategies/launchers/subprocess_script_hydra.py diff --git a/pytorch_lightning/strategies/ddp.py b/pytorch_lightning/strategies/ddp.py index 068e82ce67e66..0347c2bf3d543 100644 --- a/pytorch_lightning/strategies/ddp.py +++ b/pytorch_lightning/strategies/ddp.py @@ -32,8 +32,10 @@ from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment from pytorch_lightning.plugins.io.checkpoint_plugin import CheckpointIO from pytorch_lightning.plugins.precision import PrecisionPlugin -from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher -from pytorch_lightning.strategies.launchers.subprocess_script_hydra import _SubprocessScriptHydraLauncher +from pytorch_lightning.strategies.launchers.subprocess_script import ( + _HydraSubprocessScriptLauncher, + _SubprocessScriptLauncher, +) from pytorch_lightning.strategies.parallel import ParallelStrategy from pytorch_lightning.trainer.states import TrainerFn from pytorch_lightning.utilities import ( @@ -129,7 +131,7 @@ def _is_single_process_single_device(self) -> bool: return True def _configure_launcher(self) -> None: - launcher_cls = _SubprocessScriptHydraLauncher if _HYDRA_AVAILABLE else _SubprocessScriptLauncher + launcher_cls = _HydraSubprocessScriptLauncher if _HYDRA_AVAILABLE else _SubprocessScriptLauncher self._launcher = launcher_cls(self.cluster_environment, self.num_processes, self.num_nodes) if not self.cluster_environment.creates_processes_externally: self._rank_0_will_call_children_scripts = True diff --git a/pytorch_lightning/strategies/launchers/__init__.py b/pytorch_lightning/strategies/launchers/__init__.py index 340a2c0160b0e..b9c5a9ec73a1c 100644 --- a/pytorch_lightning/strategies/launchers/__init__.py +++ b/pytorch_lightning/strategies/launchers/__init__.py @@ -13,12 +13,16 @@ # limitations under the License. from pytorch_lightning.strategies.launchers.base import _Launcher from pytorch_lightning.strategies.launchers.spawn import _SpawnLauncher -from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher +from pytorch_lightning.strategies.launchers.subprocess_script import ( + _HydraSubprocessScriptLauncher, + _SubprocessScriptLauncher, +) from pytorch_lightning.strategies.launchers.xla_spawn import _XLASpawnLauncher __all__ = [ "_Launcher", "_SpawnLauncher", "_SubprocessScriptLauncher", + "_HydraSubprocessScriptLauncher", "_XLASpawnLauncher", ] diff --git a/pytorch_lightning/strategies/launchers/base.py b/pytorch_lightning/strategies/launchers/base.py index 4a30183ffd2b6..2acf54afef245 100644 --- a/pytorch_lightning/strategies/launchers/base.py +++ b/pytorch_lightning/strategies/launchers/base.py @@ -34,6 +34,3 @@ def is_interactive_compatible(self) -> bool: @abstractmethod def launch(self, function: Callable, *args: Any, **kwargs: Any) -> Any: """Launches the processes.""" - - def teardown(self) -> None: - """This method is called to teardown the launcher.""" diff --git a/pytorch_lightning/strategies/launchers/subprocess_script.py b/pytorch_lightning/strategies/launchers/subprocess_script.py index d2816a4776c38..503921149ced1 100644 --- a/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -19,10 +19,16 @@ import __main__ import numpy as np +import torch import pytorch_lightning as pl from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment from pytorch_lightning.strategies.launchers.base import _Launcher +from pytorch_lightning.utilities import _HYDRA_AVAILABLE + +if _HYDRA_AVAILABLE: + from hydra.core.hydra_config import HydraConfig + from hydra.utils import to_absolute_path class _SubprocessScriptLauncher(_Launcher): @@ -147,3 +153,87 @@ def _check_can_spawn_children(self) -> None: " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." ) + + +class _HydraSubprocessScriptLauncher(_SubprocessScriptLauncher): + """Hydra Launcher to support Hydra commands.""" + + def _get_complete_path(self, command: str) -> str: + return to_absolute_path(command) + + def _get_launch_command(self, command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: + """Modifies the command to support Hydra initiated processes.""" + if not HydraConfig.initialized(): + return command, None + + # If Hydra is initialized: + # 1) Set `cwd` to the hydra working directory + # 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child + + cwd = os.getcwd() + os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name + + hydra_cfg = HydraConfig.get() + hydra_output = os.path.join(cwd, hydra_cfg.output_subdir) + + if __main__.__spec__ is None: # pragma: no-cover + command_no_args = command[:2] + else: + # this fails for `python -m pdb -m a.b.c ` + command_no_args = command[:3] + + command = command_no_args + + # run the Hydra job using the current job configuration + # - typically located in: + # RUN MODE: hydra.run.dir/.hydra/config.ayml + # MULTIRUN MODE: hydra.sweep.dir/hydra.sweep.subdir/.hydra/config.yaml + command += ["-cp", hydra_output, "-cn", "config.yaml"] + + # hydra.output_subdir=.pl_ddp_hydra_{local_rank} + # Store process config in its own to avoid overwriting + # and allow the user to very that each spawned job uses + # the same configuration + # hydra.run.dir={os_cwd} + # This makes sure to run this job, log, and store any outputs + # in the current experiment directory + # + # hydra.job.name=train_ddp_process_{local_rank} + # This defines the logging output file for the process + command += [ + f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", + f"hydra.run.dir={os_cwd}", + f"hydra.job.name=train_ddp_process_{local_rank}", + ] + return command, cwd + + def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any: + """Creates new processes, then calls the given function. + + Arguments: + function: A callback function to execute after all processes have been created. + It is up to the implementation of this function to synchronize the processes, e.g., with barriers. + *args: Optional positional arguments to be passed to the given function. + trainer: Optional reference to the :class:`~pytorch_lightning.trainer.trainer.Trainer`. + **kwargs: Optional keyword arguments to be passed to the given function. + """ + results = super().launch(function, *args, **kwargs) + _teardown_ddp_for_hydra_multirun() + return results + + +def _teardown_ddp_for_hydra_multirun(): + if HydraConfig.initialized(): + # shutdown any distributed process groups + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + envs = ( + "LOCAL_RANK", + "NODE_RANK", + "WORLD_SIZE", + "MASTER_ADDR", + "MASTER_PORT", + ) + for name in envs: + os.environ.pop(name, None) diff --git a/pytorch_lightning/strategies/launchers/subprocess_script_hydra.py b/pytorch_lightning/strategies/launchers/subprocess_script_hydra.py deleted file mode 100644 index ba0ac5c0b6bd6..0000000000000 --- a/pytorch_lightning/strategies/launchers/subprocess_script_hydra.py +++ /dev/null @@ -1,82 +0,0 @@ -import os -from typing import List, Optional, Tuple - -import __main__ -import torch - -from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher -from pytorch_lightning.utilities import _HYDRA_AVAILABLE - -if _HYDRA_AVAILABLE: - from hydra.core.hydra_config import HydraConfig - from hydra.utils import to_absolute_path - - -class _SubprocessScriptHydraLauncher(_SubprocessScriptLauncher): - """Hydra Launcher subclasses `_SubprocessScriptLauncher to manage Hydra specific commands when running under - Hydra environment.""" - - def _get_complete_path(self, command: str) -> str: - return to_absolute_path(command) - - def _get_launch_command(self, command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: - """Modifies the command to support Hydra initiated processes.""" - if not HydraConfig.initialized(): - return command, None - - # If Hydra is initialized: - # 1) Set `cwd` to the hydra working directory - # 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child - - cwd = os.getcwd() - os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name - - hydra_cfg = HydraConfig.get() - hydra_output = os.path.join(cwd, hydra_cfg.output_subdir) - - if __main__.__spec__ is None: # pragma: no-cover - command_no_args = command[:2] - else: - # this fails for `python -m pdb -m a.b.c ` - command_no_args = command[:3] - - command = command_no_args - - # run the Hydra job using the current job configuration - # - typically located in: - # RUN MODE: hydra.run.dir/.hydra/config.ayml - # MULTIRUN MODE: hydra.sweep.dir/hydra.sweep.subdir/.hydra/config.yaml - command += ["-cp", hydra_output, "-cn", "config.yaml"] - - # hydra.output_subdir=.pl_ddp_hydra_{local_rank} - # Store process config in its own to avoid overwriting - # and allow the user to very that each spawned job uses - # the same configuration - # hydra.run.dir={os_cwd} - # This makes sure to run this job, log, and store any outputs - # in the current experiment directory - # - # hydra.job.name=train_ddp_process_{local_rank} - # This defines the logging output file for the process - command += [ - f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", - f"hydra.run.dir={os_cwd}", - f"hydra.job.name=train_ddp_process_{local_rank}", - ] - return command, cwd - - def teardown(self) -> None: - if HydraConfig.initialized(): - # shutdown any distributed process groups - if torch.distributed.is_initialized(): - torch.distributed.destroy_process_group() - - envs = ( - "LOCAL_RANK", - "NODE_RANK", - "WORLD_SIZE", - "MASTER_ADDR", - "MASTER_PORT", - ) - for name in envs: - os.environ.pop(name, None) diff --git a/pytorch_lightning/strategies/strategy.py b/pytorch_lightning/strategies/strategy.py index a475f036d6b90..3ff8f10c6536c 100644 --- a/pytorch_lightning/strategies/strategy.py +++ b/pytorch_lightning/strategies/strategy.py @@ -442,9 +442,6 @@ def teardown(self) -> None: optimizers_to_device(self.optimizers, torch.device("cpu")) self.precision_plugin.teardown() - if self.launcher: - self.launcher.teardown() - @classmethod def register_strategies(cls, strategy_registry) -> None: pass From 011db0995fb5869ba214876e4ca37ff00523f1b4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 21 Jun 2022 21:52:59 +0000 Subject: [PATCH 17/34] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/strategies/test_ddp_hydra_support.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/strategies/test_ddp_hydra_support.py index 993c02801b191..3f9cf57a17680 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/strategies/test_ddp_hydra_support.py @@ -5,9 +5,9 @@ from pathlib import Path import pytest +from tests.helpers.runif import RunIf from pytorch_lightning.utilities.imports import _HYDRA_AVAILABLE -from tests.helpers.runif import RunIf if _HYDRA_AVAILABLE: from omegaconf import OmegaConf From d763922cd5f46e55a8e6c0a9e3fe5f84659aadca Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Thu, 11 Aug 2022 21:27:42 -0400 Subject: [PATCH 18/34] prepare for merge --- .../strategies/launchers/__init__.py | 6 +- .../strategies/launchers/hydra.py | 114 ++++++++++++++++++ .../strategies/test_ddp_hydra_support.py | 11 +- 3 files changed, 121 insertions(+), 10 deletions(-) create mode 100644 src/pytorch_lightning/strategies/launchers/hydra.py rename tests/{ => tests_pytorch}/strategies/test_ddp_hydra_support.py (95%) diff --git a/src/pytorch_lightning/strategies/launchers/__init__.py b/src/pytorch_lightning/strategies/launchers/__init__.py index b9c5a9ec73a1c..8fab35b937e28 100644 --- a/src/pytorch_lightning/strategies/launchers/__init__.py +++ b/src/pytorch_lightning/strategies/launchers/__init__.py @@ -12,11 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. from pytorch_lightning.strategies.launchers.base import _Launcher +from pytorch_lightning.strategies.launchers.hydra import _HydraSubprocessScriptLauncher from pytorch_lightning.strategies.launchers.spawn import _SpawnLauncher -from pytorch_lightning.strategies.launchers.subprocess_script import ( - _HydraSubprocessScriptLauncher, - _SubprocessScriptLauncher, -) +from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher from pytorch_lightning.strategies.launchers.xla_spawn import _XLASpawnLauncher __all__ = [ diff --git a/src/pytorch_lightning/strategies/launchers/hydra.py b/src/pytorch_lightning/strategies/launchers/hydra.py new file mode 100644 index 0000000000000..c8486850d8bd0 --- /dev/null +++ b/src/pytorch_lightning/strategies/launchers/hydra.py @@ -0,0 +1,114 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import subprocess +import sys +from time import sleep +from typing import Any, Callable, List, Optional, Tuple + +import __main__ +import numpy as np +import torch + +import pytorch_lightning as pl +from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher +from pytorch_lightning.utilities import _HYDRA_AVAILABLE + +if _HYDRA_AVAILABLE: + from hydra.core.hydra_config import HydraConfig + from hydra.utils import to_absolute_path + + +class _HydraSubprocessScriptLauncher(_SubprocessScriptLauncher): + """Hydra Launcher to support Hydra commands.""" + + def _get_complete_path(self, command: str) -> str: + return to_absolute_path(command) + + def _get_launch_command(self, command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: + """Modifies the command to support Hydra initiated processes.""" + if not HydraConfig.initialized(): + return command, None + + # If Hydra is initialized: + # 1) Set `cwd` to the hydra working directory + # 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child + + cwd = os.getcwd() + os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name + + hydra_cfg = HydraConfig.get() + hydra_output = os.path.join(cwd, hydra_cfg.output_subdir) + + if __main__.__spec__ is None: # pragma: no-cover + command_no_args = command[:2] + else: + # this fails for `python -m pdb -m a.b.c ` + command_no_args = command[:3] + + command = command_no_args + + # run the Hydra job using the current job configuration + # - typically located in: + # RUN MODE: hydra.run.dir/.hydra/config.ayml + # MULTIRUN MODE: hydra.sweep.dir/hydra.sweep.subdir/.hydra/config.yaml + command += ["-cp", hydra_output, "-cn", "config.yaml"] + + # hydra.output_subdir=.pl_ddp_hydra_{local_rank} + # Store process config in its own to avoid overwriting + # and allow the user to very that each spawned job uses + # the same configuration + # hydra.run.dir={os_cwd} + # This makes sure to run this job, log, and store any outputs + # in the current experiment directory + # + # hydra.job.name=train_ddp_process_{local_rank} + # This defines the logging output file for the process + command += [ + f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", + f"hydra.run.dir={os_cwd}", + f"hydra.job.name=train_ddp_process_{local_rank}", + ] + return command, cwd + + def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any: + """Creates new processes, then calls the given function. + + Arguments: + function: A callback function to execute after all processes have been created. + It is up to the implementation of this function to synchronize the processes, e.g., with barriers. + *args: Optional positional arguments to be passed to the given function. + trainer: Optional reference to the :class:`~pytorch_lightning.trainer.trainer.Trainer`. + **kwargs: Optional keyword arguments to be passed to the given function. + """ + results = super().launch(function, *args, **kwargs) + _teardown_ddp_for_hydra_multirun() + return results + + +def _teardown_ddp_for_hydra_multirun(): + if HydraConfig.initialized(): + # shutdown any distributed process groups + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + envs = ( + "LOCAL_RANK", + "NODE_RANK", + "WORLD_SIZE", + "MASTER_ADDR", + "MASTER_PORT", + ) + for name in envs: + os.environ.pop(name, None) diff --git a/tests/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py similarity index 95% rename from tests/strategies/test_ddp_hydra_support.py rename to tests/tests_pytorch/strategies/test_ddp_hydra_support.py index 3f9cf57a17680..810be544ddfcc 100644 --- a/tests/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -5,9 +5,9 @@ from pathlib import Path import pytest -from tests.helpers.runif import RunIf from pytorch_lightning.utilities.imports import _HYDRA_AVAILABLE +from tests.tests_pytorch.helpers.runif import RunIf if _HYDRA_AVAILABLE: from omegaconf import OmegaConf @@ -53,8 +53,8 @@ def run_process(cmd): import torch from pytorch_lightning import Trainer -from tests.helpers.boring_model import BoringModel +from tests.tests_pytorch.helpers import BoringModel class BoringModelGPU(BoringModel): def on_train_start(self) -> None: @@ -62,8 +62,7 @@ def on_train_start(self) -> None: assert self.device == torch.device(f"cuda:{self.trainer.strategy.local_rank}") self.start_cuda_memory = torch.cuda.memory_allocated() - -@hydra.main() +@hydra.main(config_path=None, version_base="1.1") def task_fn(cfg): trainer = Trainer(accelerator="auto", devices=cfg.devices, strategy=cfg.strategy, fast_dev_run=True) model = BoringModelGPU() @@ -78,7 +77,7 @@ def task_fn(cfg): """ -@RunIf(min_gpus=2) +@RunIf(min_cuda_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) @@ -108,7 +107,7 @@ def test_ddp_with_hydra_runjob(subdir): assert len(logs) == devices - 1 -@RunIf(min_gpus=2) +@RunIf(min_cuda_gpus=2) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("num_jobs", [1, 2]) From 5bcfad1fb2393f2bc83e97c3f00723c1e78c9eff Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Thu, 11 Aug 2022 23:18:18 -0400 Subject: [PATCH 19/34] working post merge --- src/pytorch_lightning/strategies/ddp.py | 7 +- .../strategies/launchers/__init__.py | 2 - .../strategies/launchers/hydra.py | 114 ----------------- .../strategies/launchers/subprocess_script.py | 115 +++--------------- .../strategies/test_ddp_hydra_support.py | 20 +-- 5 files changed, 31 insertions(+), 227 deletions(-) delete mode 100644 src/pytorch_lightning/strategies/launchers/hydra.py diff --git a/src/pytorch_lightning/strategies/ddp.py b/src/pytorch_lightning/strategies/ddp.py index e3deb20d3e5cf..57ab3a151b011 100644 --- a/src/pytorch_lightning/strategies/ddp.py +++ b/src/pytorch_lightning/strategies/ddp.py @@ -38,10 +38,7 @@ from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment from pytorch_lightning.plugins.io.checkpoint_plugin import CheckpointIO from pytorch_lightning.plugins.precision import PrecisionPlugin -from pytorch_lightning.strategies.launchers.subprocess_script import ( - _HydraSubprocessScriptLauncher, - _SubprocessScriptLauncher, -) +from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher from pytorch_lightning.strategies.parallel import ParallelStrategy from pytorch_lightning.strategies.strategy import TBroadcast from pytorch_lightning.trainer.states import TrainerFn @@ -154,7 +151,7 @@ def process_group_backend(self) -> Optional[str]: def _configure_launcher(self) -> None: assert self.cluster_environment is not None if not self.cluster_environment.creates_processes_externally: - self._launcher = launcher_cls(self.cluster_environment, self.num_processes, self.num_nodes) + self._launcher = _SubprocessScriptLauncher(self.cluster_environment, self.num_processes, self.num_nodes) self._rank_0_will_call_children_scripts = True def setup_environment(self) -> None: diff --git a/src/pytorch_lightning/strategies/launchers/__init__.py b/src/pytorch_lightning/strategies/launchers/__init__.py index 01ae51bde60c9..d75df88b2df28 100644 --- a/src/pytorch_lightning/strategies/launchers/__init__.py +++ b/src/pytorch_lightning/strategies/launchers/__init__.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. from pytorch_lightning.strategies.launchers.base import _Launcher -from pytorch_lightning.strategies.launchers.hydra import _HydraSubprocessScriptLauncher from pytorch_lightning.strategies.launchers.multiprocessing import _MultiProcessingLauncher from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher from pytorch_lightning.strategies.launchers.xla import _XLALauncher @@ -21,6 +20,5 @@ "_Launcher", "_MultiProcessingLauncher", "_SubprocessScriptLauncher", - "_HydraSubprocessScriptLauncher", "_XLALauncher", ] diff --git a/src/pytorch_lightning/strategies/launchers/hydra.py b/src/pytorch_lightning/strategies/launchers/hydra.py deleted file mode 100644 index c8486850d8bd0..0000000000000 --- a/src/pytorch_lightning/strategies/launchers/hydra.py +++ /dev/null @@ -1,114 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import subprocess -import sys -from time import sleep -from typing import Any, Callable, List, Optional, Tuple - -import __main__ -import numpy as np -import torch - -import pytorch_lightning as pl -from pytorch_lightning.strategies.launchers.subprocess_script import _SubprocessScriptLauncher -from pytorch_lightning.utilities import _HYDRA_AVAILABLE - -if _HYDRA_AVAILABLE: - from hydra.core.hydra_config import HydraConfig - from hydra.utils import to_absolute_path - - -class _HydraSubprocessScriptLauncher(_SubprocessScriptLauncher): - """Hydra Launcher to support Hydra commands.""" - - def _get_complete_path(self, command: str) -> str: - return to_absolute_path(command) - - def _get_launch_command(self, command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: - """Modifies the command to support Hydra initiated processes.""" - if not HydraConfig.initialized(): - return command, None - - # If Hydra is initialized: - # 1) Set `cwd` to the hydra working directory - # 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child - - cwd = os.getcwd() - os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name - - hydra_cfg = HydraConfig.get() - hydra_output = os.path.join(cwd, hydra_cfg.output_subdir) - - if __main__.__spec__ is None: # pragma: no-cover - command_no_args = command[:2] - else: - # this fails for `python -m pdb -m a.b.c ` - command_no_args = command[:3] - - command = command_no_args - - # run the Hydra job using the current job configuration - # - typically located in: - # RUN MODE: hydra.run.dir/.hydra/config.ayml - # MULTIRUN MODE: hydra.sweep.dir/hydra.sweep.subdir/.hydra/config.yaml - command += ["-cp", hydra_output, "-cn", "config.yaml"] - - # hydra.output_subdir=.pl_ddp_hydra_{local_rank} - # Store process config in its own to avoid overwriting - # and allow the user to very that each spawned job uses - # the same configuration - # hydra.run.dir={os_cwd} - # This makes sure to run this job, log, and store any outputs - # in the current experiment directory - # - # hydra.job.name=train_ddp_process_{local_rank} - # This defines the logging output file for the process - command += [ - f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", - f"hydra.run.dir={os_cwd}", - f"hydra.job.name=train_ddp_process_{local_rank}", - ] - return command, cwd - - def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any: - """Creates new processes, then calls the given function. - - Arguments: - function: A callback function to execute after all processes have been created. - It is up to the implementation of this function to synchronize the processes, e.g., with barriers. - *args: Optional positional arguments to be passed to the given function. - trainer: Optional reference to the :class:`~pytorch_lightning.trainer.trainer.Trainer`. - **kwargs: Optional keyword arguments to be passed to the given function. - """ - results = super().launch(function, *args, **kwargs) - _teardown_ddp_for_hydra_multirun() - return results - - -def _teardown_ddp_for_hydra_multirun(): - if HydraConfig.initialized(): - # shutdown any distributed process groups - if torch.distributed.is_initialized(): - torch.distributed.destroy_process_group() - - envs = ( - "LOCAL_RANK", - "NODE_RANK", - "WORLD_SIZE", - "MASTER_ADDR", - "MASTER_PORT", - ) - for name in envs: - os.environ.pop(name, None) diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index ea98cf902dac2..ca413be29a6f7 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -15,18 +15,17 @@ import subprocess import sys from time import sleep -from typing import Any, Callable, List, Optional, Tuple +from typing import Any, Callable, Optional import __main__ import numpy as np -import torch import pytorch_lightning as pl from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment from pytorch_lightning.strategies.launchers.base import _Launcher from pytorch_lightning.utilities.imports import _RequirementAvailable -_HYDRA_AVAILABLE = _RequirementAvailable("hydra") +_HYDRA_AVAILABLE = _RequirementAvailable("hydra-core") class _SubprocessScriptLauncher(_Launcher): @@ -91,12 +90,6 @@ def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] self._call_children_scripts() return function(*args, **kwargs) - def _get_complete_path(self, command: str) -> str: - return os.path.abspath(command) - - def _get_launch_command(self, command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: - return command, None - def _call_children_scripts(self) -> None: # bookkeeping of spawned processes self._check_can_spawn_children() @@ -149,13 +142,23 @@ def _call_children_scripts(self) -> None: cwd: Optional[str] = None if _HYDRA_AVAILABLE: from hydra.core.hydra_config import HydraConfig - from hydra.utils import get_original_cwd if HydraConfig.initialized(): - cwd = get_original_cwd() - os_cwd = f'"{os.getcwd()}"' - command += [f"hydra.run.dir={os_cwd}", f"hydra.job.name=train_ddp_process_{local_rank}"] - subprocess.Popen(command, env=env_copy, cwd=cwd) + cwd = os.getcwd() + os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name + + hydra_cfg = HydraConfig.get() + hydra_output = os_cwd + if hydra_cfg.output_subdir is not None: + hydra_output = os.path.join(cwd, hydra_cfg.output_subdir) + + command = command[:2] if __main__.__spec__ is None else command[:3] + command += ["-cp", hydra_output, "-cn", "config.yaml"] + command += [ + f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", + f"hydra.run.dir={os_cwd}", + ] + subprocess.Popen(command, env=env_copy) # starting all processes at once can cause issues # with dataloaders delay between 1-10 seconds @@ -169,87 +172,3 @@ def _check_can_spawn_children(self) -> None: " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." ) - - -class _HydraSubprocessScriptLauncher(_SubprocessScriptLauncher): - """Hydra Launcher to support Hydra commands.""" - - def _get_complete_path(self, command: str) -> str: - return to_absolute_path(command) - - def _get_launch_command(self, command: List[str], local_rank: int) -> Tuple[List[str], Optional[str]]: - """Modifies the command to support Hydra initiated processes.""" - if not HydraConfig.initialized(): - return command, None - - # If Hydra is initialized: - # 1) Set `cwd` to the hydra working directory - # 2) Use the stored configuration in `hydra_cfg.output_subdir / config.yaml` to spawn a new child - - cwd = os.getcwd() - os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name - - hydra_cfg = HydraConfig.get() - hydra_output = os.path.join(cwd, hydra_cfg.output_subdir) - - if __main__.__spec__ is None: # pragma: no-cover - command_no_args = command[:2] - else: - # this fails for `python -m pdb -m a.b.c ` - command_no_args = command[:3] - - command = command_no_args - - # run the Hydra job using the current job configuration - # - typically located in: - # RUN MODE: hydra.run.dir/.hydra/config.ayml - # MULTIRUN MODE: hydra.sweep.dir/hydra.sweep.subdir/.hydra/config.yaml - command += ["-cp", hydra_output, "-cn", "config.yaml"] - - # hydra.output_subdir=.pl_ddp_hydra_{local_rank} - # Store process config in its own to avoid overwriting - # and allow the user to very that each spawned job uses - # the same configuration - # hydra.run.dir={os_cwd} - # This makes sure to run this job, log, and store any outputs - # in the current experiment directory - # - # hydra.job.name=train_ddp_process_{local_rank} - # This defines the logging output file for the process - command += [ - f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", - f"hydra.run.dir={os_cwd}", - f"hydra.job.name=train_ddp_process_{local_rank}", - ] - return command, cwd - - def launch(self, function: Callable, *args: Any, trainer: Optional["pl.Trainer"] = None, **kwargs: Any) -> Any: - """Creates new processes, then calls the given function. - - Arguments: - function: A callback function to execute after all processes have been created. - It is up to the implementation of this function to synchronize the processes, e.g., with barriers. - *args: Optional positional arguments to be passed to the given function. - trainer: Optional reference to the :class:`~pytorch_lightning.trainer.trainer.Trainer`. - **kwargs: Optional keyword arguments to be passed to the given function. - """ - results = super().launch(function, *args, **kwargs) - _teardown_ddp_for_hydra_multirun() - return results - - -def _teardown_ddp_for_hydra_multirun(): - if HydraConfig.initialized(): - # shutdown any distributed process groups - if torch.distributed.is_initialized(): - torch.distributed.destroy_process_group() - - envs = ( - "LOCAL_RANK", - "NODE_RANK", - "WORLD_SIZE", - "MASTER_ADDR", - "MASTER_PORT", - ) - for name in envs: - os.environ.pop(name, None) diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py index 810be544ddfcc..b9f9dd458b279 100644 --- a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -6,9 +6,11 @@ import pytest -from pytorch_lightning.utilities.imports import _HYDRA_AVAILABLE +from pytorch_lightning.utilities.imports import _RequirementAvailable from tests.tests_pytorch.helpers.runif import RunIf +_HYDRA_AVAILABLE = bool(_RequirementAvailable("hydra-core")) + if _HYDRA_AVAILABLE: from omegaconf import OmegaConf @@ -67,10 +69,12 @@ def task_fn(cfg): trainer = Trainer(accelerator="auto", devices=cfg.devices, strategy=cfg.strategy, fast_dev_run=True) model = BoringModelGPU() trainer.fit(model) - # make sure teardown executed - assert not torch.distributed.is_initialized() - assert "LOCAL_RANK" not in os.environ + trainer.test(model) + + if torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + os.environ.pop("LOCAL_RANK", None) if __name__ == "__main__": task_fn() @@ -103,8 +107,8 @@ def test_ddp_with_hydra_runjob(subdir): assert cfg.devices == devices # Make sure PL spawned a job that is logged by Hydra - logs = list(Path.cwd().glob("**/train_ddp_process_*.log")) - assert len(logs) == devices - 1 + logs = list(Path.cwd().glob("**/*.log")) + assert len(logs) == 1 @RunIf(min_cuda_gpus=2) @@ -138,5 +142,5 @@ def test_ddp_with_hydra_multirunjob(num_jobs): assert cfg.devices == devices assert cfg.foo == local_rank - logs = list(Path.cwd().glob("**/train_ddp_process_*.log")) - assert len(logs) == num_jobs * (devices - 1) + logs = list(Path.cwd().glob("**/*.log")) + assert len(logs) == num_jobs From 387a7cfa3fc7a039131717cacb54b435033cfacc Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Mon, 15 Aug 2022 14:51:41 -0400 Subject: [PATCH 20/34] fixes test import --- tests/tests_pytorch/strategies/test_ddp_hydra_support.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py index b9f9dd458b279..3147fafadae43 100644 --- a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -7,7 +7,7 @@ import pytest from pytorch_lightning.utilities.imports import _RequirementAvailable -from tests.tests_pytorch.helpers.runif import RunIf +from tests_pytorch.helpers.runif import RunIf _HYDRA_AVAILABLE = bool(_RequirementAvailable("hydra-core")) From e8d625bda5cb92d5de925584aa25aab1dfd73406 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Tue, 16 Aug 2022 13:30:59 -0400 Subject: [PATCH 21/34] add support for hydra `rerun` --- .../strategies/launchers/subprocess_script.py | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index ca413be29a6f7..535274a758f0f 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -137,27 +137,36 @@ def _call_children_scripts(self) -> None: if os.environ.get("PL_GLOBAL_SEED") is None and "PL_GLOBAL_SEED" in env_copy: del env_copy["PL_GLOBAL_SEED"] - # start process - # if hydra is available and initialized, make sure to set the cwd correctly - cwd: Optional[str] = None + # if hydra is available process multiple processes if _HYDRA_AVAILABLE: from hydra.core.hydra_config import HydraConfig if HydraConfig.initialized(): - cwd = os.getcwd() - os_cwd = f'"{cwd}"' # this is needed to handle characters like `=` in the directory name - + # extract the hydra configu hydra_cfg = HydraConfig.get() - hydra_output = os_cwd + + # the location of the hydra configuration files saved for the current job + hydra_output = hydra_cfg.runtime.output_dir if hydra_cfg.output_subdir is not None: - hydra_output = os.path.join(cwd, hydra_cfg.output_subdir) + hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir) + # the command (without the CLI arguments) command = command[:2] if __main__.__spec__ is None else command[:3] - command += ["-cp", hydra_output, "-cn", "config.yaml"] - command += [ - f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", - f"hydra.run.dir={os_cwd}", - ] + + # check if experimental re-run capability exists + # otherwise use existing config.yaml which may have issues + pickled_config = os.path.join(hydra_output, "config.pickle") + if os.path.exists(pickled_config): + command += ["--experimental-rerun", pickled_config] + + else: + command += ["-cp", hydra_output, "-cn", "config.yaml"] + command += [ + f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", + f"hydra.run.dir={hydra_cfg.runtime.output_dir}", + ] + + # start process subprocess.Popen(command, env=env_copy) # starting all processes at once can cause issues From 0f12e9a5833392f54aa485c40af5963b1cab4eb8 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Sat, 27 Aug 2022 01:06:09 -0400 Subject: [PATCH 22/34] refactor subprocess cmd and add test --- .../strategies/launchers/subprocess_script.py | 101 +++++++++--------- .../strategies/test_ddp_hydra_support.py | 52 ++++++++- 2 files changed, 98 insertions(+), 55 deletions(-) diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index 535274a758f0f..233c50814a39a 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -15,7 +15,7 @@ import subprocess import sys from time import sleep -from typing import Any, Callable, Optional +from typing import Any, Callable, Optional, Sequence import __main__ import numpy as np @@ -101,32 +101,6 @@ def _call_children_scripts(self) -> None: # allow the user to pass the node rank os.environ["NODE_RANK"] = str(self.cluster_environment.node_rank()) os.environ["LOCAL_RANK"] = str(self.cluster_environment.local_rank()) - - # Check if the current calling command looked like `python a/b/c.py` or `python -m a.b.c` - # See https://docs.python.org/3/reference/import.html#main-spec - if __main__.__spec__ is None: # pragma: no-cover - # Script called as `python a/b/c.py` - if _HYDRA_AVAILABLE: - # when user is using hydra find the absolute path - from hydra.utils import to_absolute_path - - to_abs_path = to_absolute_path - else: - to_abs_path = os.path.abspath - - # pull out the commands used to run the script and resolve the absolute file path - command = sys.argv - try: - full_path = to_abs_path(command[0]) - except Exception: - full_path = os.path.abspath(command[0]) - - command[0] = full_path - # use the same python interpreter and actually running - command = [sys.executable] + command - else: # Script called as `python -m a.b.c` - command = [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:] - os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}" for local_rank in range(1, self.num_processes): @@ -137,36 +111,17 @@ def _call_children_scripts(self) -> None: if os.environ.get("PL_GLOBAL_SEED") is None and "PL_GLOBAL_SEED" in env_copy: del env_copy["PL_GLOBAL_SEED"] - # if hydra is available process multiple processes + hydra_in_use = False if _HYDRA_AVAILABLE: from hydra.core.hydra_config import HydraConfig - if HydraConfig.initialized(): - # extract the hydra configu - hydra_cfg = HydraConfig.get() - - # the location of the hydra configuration files saved for the current job - hydra_output = hydra_cfg.runtime.output_dir - if hydra_cfg.output_subdir is not None: - hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir) - - # the command (without the CLI arguments) - command = command[:2] if __main__.__spec__ is None else command[:3] + hydra_in_use = HydraConfig.initialized() - # check if experimental re-run capability exists - # otherwise use existing config.yaml which may have issues - pickled_config = os.path.join(hydra_output, "config.pickle") - if os.path.exists(pickled_config): - command += ["--experimental-rerun", pickled_config] - - else: - command += ["-cp", hydra_output, "-cn", "config.yaml"] - command += [ - f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", - f"hydra.run.dir={hydra_cfg.runtime.output_dir}", - ] + if hydra_in_use: + command = _hydra_subprocess_cmd(local_rank) + else: + command = _basic_subprocess_cmd(local_rank) - # start process subprocess.Popen(command, env=env_copy) # starting all processes at once can cause issues @@ -181,3 +136,45 @@ def _check_can_spawn_children(self) -> None: " Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user," " 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented." ) + + +def _basic_subprocess_cmd(local_rank: int) -> Sequence[str]: + if __main__.__spec__ is None: # pragma: no-cover + return [sys.executable, os.path.abspath(sys.argv[0])] + sys.argv[1:] + else: + return [sys.executable, "-m", __main__.__spec__.name] + sys.argv[1:] + + +def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]: + from hydra.core.hydra_config import HydraConfig + from hydra.utils import to_absolute_path + + # when user is using hydra find the absolute path + if __main__.__spec__ is None: # pragma: no-cover + command = [sys.executable, to_absolute_path(sys.argv[0])] + else: + command = [sys.executable, "-m", __main__.__spec__.name] + + # extract the hydra configu + hydra_cfg = HydraConfig.get() + + # the location of the hydra configuration files saved for the current job + hydra_output = hydra_cfg.runtime.output_dir + if hydra_cfg.output_subdir is not None: + hydra_output = os.path.join(hydra_output, hydra_cfg.output_subdir) + + # check if experimental re-run capability exists + # otherwise use existing config.yaml which may have issues + pickled_config = os.path.join(hydra_output, "config.pickle") + if os.path.exists(pickled_config): + command += ["--experimental-rerun", pickled_config] + + else: + command += ["-cp", hydra_output, "-cn", "config.yaml"] + command += [ + f"hydra.output_subdir=.pl_ddp_hydra_{local_rank}", + f"hydra.run.dir={hydra_cfg.runtime.output_dir}", + ] + + print("*********************", command) + return command diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py index 3147fafadae43..724fca18406c9 100644 --- a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -6,11 +6,9 @@ import pytest -from pytorch_lightning.utilities.imports import _RequirementAvailable +from pytorch_lightning.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE from tests_pytorch.helpers.runif import RunIf -_HYDRA_AVAILABLE = bool(_RequirementAvailable("hydra-core")) - if _HYDRA_AVAILABLE: from omegaconf import OmegaConf @@ -144,3 +142,51 @@ def test_ddp_with_hydra_multirunjob(num_jobs): logs = list(Path.cwd().glob("**/*.log")) assert len(logs) == num_jobs + + +yaml_file = """ +hydra: + callbacks: + save_job_info: + _target_: hydra.experimental.callbacks.PickleJobInfoCallback +""" + + +@RunIf(min_cuda_gpus=2) +@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") +@pytest.mark.usefixtures("cleandir") +@pytest.mark.parametrize("num_jobs", [1, 2]) +def test_ddp_with_hydra_multirunjob_rerun(num_jobs): + # Save script locally + with open("temp.py", "w") as fn: + fn.write(script) + + with open("config.yaml", "w") as fn: + fn.write(yaml_file) + + # create fake multirun params based on `num_jobs` + fake_param = "+foo=" + devices = 2 + for i in range(num_jobs): + fake_param += f"{i}" + if i < num_jobs - 1: + fake_param += "," + + # Run CLI + run_process( + [ + sys.executable, + "temp.py", + "-cp", + ".", + "-cn", + "config.yaml", + f"+devices={devices}", + '+strategy="ddp"', + fake_param, + "--multirun", + ] + ) + + pickles = sorted(Path.cwd().glob("**/.hydra/config.pickle")) + assert len(pickles) == num_jobs From f67856f0cddb6374f33a90fdcc1a788ac0a060f1 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Sat, 27 Aug 2022 10:08:42 -0400 Subject: [PATCH 23/34] Update tests/tests_pytorch/strategies/test_ddp_hydra_support.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Carlos Mocholí --- tests/tests_pytorch/strategies/test_ddp_hydra_support.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py index 724fca18406c9..df688b678f5c3 100644 --- a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -181,7 +181,7 @@ def test_ddp_with_hydra_multirunjob_rerun(num_jobs): ".", "-cn", "config.yaml", - f"+devices={devices}", + "+devices=2", '+strategy="ddp"', fake_param, "--multirun", From e2bdb769aa54419e6654c437c8c3ab64583319e4 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Sat, 27 Aug 2022 10:08:50 -0400 Subject: [PATCH 24/34] Update tests/tests_pytorch/strategies/test_ddp_hydra_support.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Carlos Mocholí --- tests/tests_pytorch/strategies/test_ddp_hydra_support.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py index df688b678f5c3..a9bc7451b181d 100644 --- a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -165,12 +165,7 @@ def test_ddp_with_hydra_multirunjob_rerun(num_jobs): fn.write(yaml_file) # create fake multirun params based on `num_jobs` - fake_param = "+foo=" - devices = 2 - for i in range(num_jobs): - fake_param += f"{i}" - if i < num_jobs - 1: - fake_param += "," + fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs)) # Run CLI run_process( From ec19a905e372defb494a7e26f04080a84c00f061 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Sat, 27 Aug 2022 11:55:18 -0400 Subject: [PATCH 25/34] resolve comments --- .../strategies/launchers/subprocess_script.py | 1 - .../strategies/test_ddp_hydra_support.py | 18 ++++++++---------- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/src/pytorch_lightning/strategies/launchers/subprocess_script.py b/src/pytorch_lightning/strategies/launchers/subprocess_script.py index 233c50814a39a..fb4160b32de7e 100644 --- a/src/pytorch_lightning/strategies/launchers/subprocess_script.py +++ b/src/pytorch_lightning/strategies/launchers/subprocess_script.py @@ -176,5 +176,4 @@ def _hydra_subprocess_cmd(local_rank: int) -> Sequence[str]: f"hydra.run.dir={hydra_cfg.runtime.output_dir}", ] - print("*********************", command) return command diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py index a9bc7451b181d..31bfad61397e4 100644 --- a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -7,8 +7,11 @@ import pytest from pytorch_lightning.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE +from pytorch_lightning.utilities.imports import _RequirementAvailable from tests_pytorch.helpers.runif import RunIf +_HYDRA_WITH_RERUN = _RequirementAvailable("hydra-core >= 1.2") + if _HYDRA_AVAILABLE: from omegaconf import OmegaConf @@ -119,25 +122,20 @@ def test_ddp_with_hydra_multirunjob(num_jobs): fn.write(script) # create fake multirun params based on `num_jobs` - fake_param = "+foo=" - devices = 2 - for i in range(num_jobs): - fake_param += f"{i}" - if i < num_jobs - 1: - fake_param += "," + fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs)) # Run CLI - run_process([sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"', fake_param, "--multirun"]) + run_process([sys.executable, "temp.py", "+devices=2", '+strategy="ddp"', fake_param, "--multirun"]) # Make sure config.yaml was created for each job configs = sorted(Path.cwd().glob("**/.pl_ddp_hydra_*/config.yaml")) - assert len(configs) == num_jobs * (devices - 1) + assert len(configs) == num_jobs # Make sure the parameter was set and used for each job for i, config in enumerate(configs): cfg = OmegaConf.load(config) local_rank = int(config.parent.parent.parts[-1]) - assert cfg.devices == devices + assert cfg.devices == 2 assert cfg.foo == local_rank logs = list(Path.cwd().glob("**/*.log")) @@ -153,7 +151,7 @@ def test_ddp_with_hydra_multirunjob(num_jobs): @RunIf(min_cuda_gpus=2) -@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") +@pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason="Hydra with `rerun` not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("num_jobs", [1, 2]) def test_ddp_with_hydra_multirunjob_rerun(num_jobs): From 635862168e821ff281b4e283585ce66116bf09d2 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Tue, 6 Sep 2022 13:36:15 -0400 Subject: [PATCH 26/34] uses hydra test_utils and checks standalone --- .../strategies/test_ddp_hydra_support.py | 29 +++---------------- 1 file changed, 4 insertions(+), 25 deletions(-) diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py index 31bfad61397e4..7a752f48533ab 100644 --- a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -1,6 +1,5 @@ import logging import os -import subprocess import sys from pathlib import Path @@ -13,6 +12,7 @@ _HYDRA_WITH_RERUN = _RequirementAvailable("hydra-core >= 1.2") if _HYDRA_AVAILABLE: + from hydra.test_utils.test_utils import run_process from omegaconf import OmegaConf @@ -28,27 +28,6 @@ def cleandir(tmp_path): logging.shutdown() -# function to run a command line argument -def run_process(cmd): - try: - process = subprocess.Popen( - args=cmd, - shell=False, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - stdout, stderr = process.communicate() - if process.returncode != 0: - sys.stderr.write(f"Subprocess error:\n{stderr}\n") - sys.stderr.write(f"Subprocess stdout:\n{stdout}\n") - raise subprocess.CalledProcessError(returncode=process.returncode, cmd=cmd) - return stdout, stderr - except Exception as e: - cmd = " ".join(cmd) - sys.stderr.write(f"Error executing:\n{cmd}\n") - raise e - - # Script to run from command line script = """ import hydra @@ -82,7 +61,7 @@ def task_fn(cfg): """ -@RunIf(min_cuda_gpus=2) +@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) @@ -112,7 +91,7 @@ def test_ddp_with_hydra_runjob(subdir): assert len(logs) == 1 -@RunIf(min_cuda_gpus=2) +@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("num_jobs", [1, 2]) @@ -150,7 +129,7 @@ def test_ddp_with_hydra_multirunjob(num_jobs): """ -@RunIf(min_cuda_gpus=2) +@RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) @pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason="Hydra with `rerun` not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("num_jobs", [1, 2]) From 3f2ab472c87db77ea06fb1028e81ae495b5d4d55 Mon Sep 17 00:00:00 2001 From: Rohit Gupta Date: Mon, 19 Sep 2022 12:46:44 -0400 Subject: [PATCH 27/34] fix tests --- .../strategies/test_ddp_hydra_support.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py index 7a752f48533ab..bbfb82880ed92 100644 --- a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -4,12 +4,12 @@ from pathlib import Path import pytest +from lightning_utilities.core.imports import RequirementCache from pytorch_lightning.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE -from pytorch_lightning.utilities.imports import _RequirementAvailable from tests_pytorch.helpers.runif import RunIf -_HYDRA_WITH_RERUN = _RequirementAvailable("hydra-core >= 1.2") +_HYDRA_WITH_RERUN = RequirementCache("hydra-core >= 1.2") if _HYDRA_AVAILABLE: from hydra.test_utils.test_utils import run_process @@ -35,8 +35,8 @@ def cleandir(tmp_path): import torch from pytorch_lightning import Trainer +from pytorch_lightning.demos.boring_classes import BoringModel -from tests.tests_pytorch.helpers import BoringModel class BoringModelGPU(BoringModel): def on_train_start(self) -> None: @@ -65,14 +65,15 @@ def task_fn(cfg): @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) -def test_ddp_with_hydra_runjob(subdir): +def test_ddp_with_hydra_runjob(tmpdir, subdir): # Save script locally - with open("temp.py", "w") as fn: + path = tmpdir / "temp.py" + with open(path, "w") as fn: fn.write(script) # Run CLI devices = 2 - cmd = [sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"'] + cmd = [sys.executable, str(path), f"+devices={devices}", '+strategy="ddp"'] if subdir is not None: cmd += [f"hydra.output_subdir={subdir}"] run_process(cmd) @@ -95,16 +96,17 @@ def test_ddp_with_hydra_runjob(subdir): @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("num_jobs", [1, 2]) -def test_ddp_with_hydra_multirunjob(num_jobs): +def test_ddp_with_hydra_multirunjob(tmpdir, num_jobs): # Save script locally - with open("temp.py", "w") as fn: + path = tmpdir / "temp.py" + with open(path, "w") as fn: fn.write(script) # create fake multirun params based on `num_jobs` fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs)) # Run CLI - run_process([sys.executable, "temp.py", "+devices=2", '+strategy="ddp"', fake_param, "--multirun"]) + run_process([sys.executable, str(path), "+devices=2", '+strategy="ddp"', fake_param, "--multirun"]) # Make sure config.yaml was created for each job configs = sorted(Path.cwd().glob("**/.pl_ddp_hydra_*/config.yaml")) @@ -133,12 +135,14 @@ def test_ddp_with_hydra_multirunjob(num_jobs): @pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason="Hydra with `rerun` not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("num_jobs", [1, 2]) -def test_ddp_with_hydra_multirunjob_rerun(num_jobs): +def test_ddp_with_hydra_multirunjob_rerun(tmpdir, num_jobs): # Save script locally - with open("temp.py", "w") as fn: + path = tmpdir / "temp.py" + with open(path, "w") as fn: fn.write(script) - with open("config.yaml", "w") as fn: + config_path = tmpdir / "config.yaml" + with open(config_path, "w") as fn: fn.write(yaml_file) # create fake multirun params based on `num_jobs` @@ -148,7 +152,7 @@ def test_ddp_with_hydra_multirunjob_rerun(num_jobs): run_process( [ sys.executable, - "temp.py", + str(path), "-cp", ".", "-cn", From 312061206d587f25ef0ee8c0d5ca5ec3b589bf82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Mochol=C3=AD?= Date: Thu, 22 Sep 2022 02:36:42 +0200 Subject: [PATCH 28/34] Revert "fix tests" This reverts commit 3f2ab472c87db77ea06fb1028e81ae495b5d4d55. --- .../strategies/test_ddp_hydra_support.py | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py index bbfb82880ed92..d46333e2e817e 100644 --- a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py +++ b/tests/tests_pytorch/strategies/test_ddp_hydra_support.py @@ -9,7 +9,7 @@ from pytorch_lightning.strategies.launchers.subprocess_script import _HYDRA_AVAILABLE from tests_pytorch.helpers.runif import RunIf -_HYDRA_WITH_RERUN = RequirementCache("hydra-core >= 1.2") +_HYDRA_WITH_RERUN = RequirementCache("hydra-core>=1.2") if _HYDRA_AVAILABLE: from hydra.test_utils.test_utils import run_process @@ -35,8 +35,8 @@ def cleandir(tmp_path): import torch from pytorch_lightning import Trainer -from pytorch_lightning.demos.boring_classes import BoringModel +from tests.tests_pytorch.helpers import BoringModel class BoringModelGPU(BoringModel): def on_train_start(self) -> None: @@ -65,15 +65,14 @@ def task_fn(cfg): @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) -def test_ddp_with_hydra_runjob(tmpdir, subdir): +def test_ddp_with_hydra_runjob(subdir): # Save script locally - path = tmpdir / "temp.py" - with open(path, "w") as fn: + with open("temp.py", "w") as fn: fn.write(script) # Run CLI devices = 2 - cmd = [sys.executable, str(path), f"+devices={devices}", '+strategy="ddp"'] + cmd = [sys.executable, "temp.py", f"+devices={devices}", '+strategy="ddp"'] if subdir is not None: cmd += [f"hydra.output_subdir={subdir}"] run_process(cmd) @@ -96,17 +95,16 @@ def test_ddp_with_hydra_runjob(tmpdir, subdir): @pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("num_jobs", [1, 2]) -def test_ddp_with_hydra_multirunjob(tmpdir, num_jobs): +def test_ddp_with_hydra_multirunjob(num_jobs): # Save script locally - path = tmpdir / "temp.py" - with open(path, "w") as fn: + with open("temp.py", "w") as fn: fn.write(script) # create fake multirun params based on `num_jobs` fake_param = "+foo=" + ",".join(str(i) for i in range(num_jobs)) # Run CLI - run_process([sys.executable, str(path), "+devices=2", '+strategy="ddp"', fake_param, "--multirun"]) + run_process([sys.executable, "temp.py", "+devices=2", '+strategy="ddp"', fake_param, "--multirun"]) # Make sure config.yaml was created for each job configs = sorted(Path.cwd().glob("**/.pl_ddp_hydra_*/config.yaml")) @@ -135,14 +133,12 @@ def test_ddp_with_hydra_multirunjob(tmpdir, num_jobs): @pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason="Hydra with `rerun` not Available") @pytest.mark.usefixtures("cleandir") @pytest.mark.parametrize("num_jobs", [1, 2]) -def test_ddp_with_hydra_multirunjob_rerun(tmpdir, num_jobs): +def test_ddp_with_hydra_multirunjob_rerun(num_jobs): # Save script locally - path = tmpdir / "temp.py" - with open(path, "w") as fn: + with open("temp.py", "w") as fn: fn.write(script) - config_path = tmpdir / "config.yaml" - with open(config_path, "w") as fn: + with open("config.yaml", "w") as fn: fn.write(yaml_file) # create fake multirun params based on `num_jobs` @@ -152,7 +148,7 @@ def test_ddp_with_hydra_multirunjob_rerun(tmpdir, num_jobs): run_process( [ sys.executable, - str(path), + "temp.py", "-cp", ".", "-cn", From 17e82cd3632007141b08a60bcf15d1e3103240b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Mochol=C3=AD?= Date: Thu, 22 Sep 2022 02:39:13 +0200 Subject: [PATCH 29/34] Move file --- .../test_subprocess_script.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/tests_pytorch/strategies/{test_ddp_hydra_support.py => launchers/test_subprocess_script.py} (100%) diff --git a/tests/tests_pytorch/strategies/test_ddp_hydra_support.py b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py similarity index 100% rename from tests/tests_pytorch/strategies/test_ddp_hydra_support.py rename to tests/tests_pytorch/strategies/launchers/test_subprocess_script.py From ed78629bb857426be93e777ef9e913a4836f33d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Mochol=C3=AD?= Date: Thu, 22 Sep 2022 02:46:17 +0200 Subject: [PATCH 30/34] Minor test signature changes --- .../strategies/launchers/test_subprocess_script.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py index d46333e2e817e..d632624e28395 100644 --- a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py +++ b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py @@ -42,7 +42,6 @@ class BoringModelGPU(BoringModel): def on_train_start(self) -> None: # make sure that the model is on GPU when training assert self.device == torch.device(f"cuda:{self.trainer.strategy.local_rank}") - self.start_cuda_memory = torch.cuda.memory_allocated() @hydra.main(config_path=None, version_base="1.1") def task_fn(cfg): @@ -62,10 +61,9 @@ def task_fn(cfg): @RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) -@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") -@pytest.mark.usefixtures("cleandir") +@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason=str(_HYDRA_AVAILABLE)) @pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) -def test_ddp_with_hydra_runjob(subdir): +def test_ddp_with_hydra_runjob(cleandir, subdir): # Save script locally with open("temp.py", "w") as fn: fn.write(script) @@ -92,10 +90,9 @@ def test_ddp_with_hydra_runjob(subdir): @RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) -@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason="Hydra not Available") -@pytest.mark.usefixtures("cleandir") +@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason=str(_HYDRA_AVAILABLE)) @pytest.mark.parametrize("num_jobs", [1, 2]) -def test_ddp_with_hydra_multirunjob(num_jobs): +def test_ddp_with_hydra_multirunjob(cleandir, num_jobs): # Save script locally with open("temp.py", "w") as fn: fn.write(script) @@ -130,8 +127,7 @@ def test_ddp_with_hydra_multirunjob(num_jobs): @RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) -@pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason="Hydra with `rerun` not Available") -@pytest.mark.usefixtures("cleandir") +@pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason=str(_HYDRA_WITH_RERUN)) @pytest.mark.parametrize("num_jobs", [1, 2]) def test_ddp_with_hydra_multirunjob_rerun(num_jobs): # Save script locally From 5165f36b15f7d393c7b76abe3c1509b9b820a1b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Mochol=C3=AD?= Date: Thu, 22 Sep 2022 02:51:56 +0200 Subject: [PATCH 31/34] Tests require hydra>=1.0.7 --- .../strategies/launchers/test_subprocess_script.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py index d632624e28395..a7461ec9a67e1 100644 --- a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py +++ b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py @@ -10,10 +10,12 @@ from tests_pytorch.helpers.runif import RunIf _HYDRA_WITH_RERUN = RequirementCache("hydra-core>=1.2") +_HYDRA_WITH_RUN_PROCESS = RequirementCache("hydra-core>=1.0.7") if _HYDRA_AVAILABLE: - from hydra.test_utils.test_utils import run_process from omegaconf import OmegaConf +if _HYDRA_WITH_RUN_PROCESS: + from hydra.test_utils.test_utils import run_process # fixture to run hydra jobs in a clean temporary directory @@ -61,7 +63,7 @@ def task_fn(cfg): @RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) -@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason=str(_HYDRA_AVAILABLE)) +@pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS)) @pytest.mark.parametrize("subdir", [None, "dksa", ".hello"]) def test_ddp_with_hydra_runjob(cleandir, subdir): # Save script locally @@ -90,7 +92,7 @@ def test_ddp_with_hydra_runjob(cleandir, subdir): @RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) -@pytest.mark.skipif(not _HYDRA_AVAILABLE, reason=str(_HYDRA_AVAILABLE)) +@pytest.mark.skipif(not _HYDRA_WITH_RUN_PROCESS, reason=str(_HYDRA_WITH_RUN_PROCESS)) @pytest.mark.parametrize("num_jobs", [1, 2]) def test_ddp_with_hydra_multirunjob(cleandir, num_jobs): # Save script locally From e3ce4a720d89d2a9ddda95f65c0569b530c01680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Mochol=C3=AD?= Date: Thu, 22 Sep 2022 02:57:59 +0200 Subject: [PATCH 32/34] oopsie --- .../strategies/launchers/test_subprocess_script.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py index a7461ec9a67e1..0af30ac6712e9 100644 --- a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py +++ b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py @@ -37,13 +37,13 @@ def cleandir(tmp_path): import torch from pytorch_lightning import Trainer - -from tests.tests_pytorch.helpers import BoringModel +from pytorch_lightning.demos.boring_classes import BoringModel class BoringModelGPU(BoringModel): def on_train_start(self) -> None: # make sure that the model is on GPU when training assert self.device == torch.device(f"cuda:{self.trainer.strategy.local_rank}") + self.start_cuda_memory = torch.cuda.memory_allocated() @hydra.main(config_path=None, version_base="1.1") def task_fn(cfg): From a827b2b21c2d7c308684f9b868bef168da8c854e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Mochol=C3=AD?= Date: Thu, 22 Sep 2022 15:48:49 +0200 Subject: [PATCH 33/34] Update tests/tests_pytorch/strategies/launchers/test_subprocess_script.py --- .../tests_pytorch/strategies/launchers/test_subprocess_script.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py index 0af30ac6712e9..81460f342248f 100644 --- a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py +++ b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py @@ -43,7 +43,6 @@ class BoringModelGPU(BoringModel): def on_train_start(self) -> None: # make sure that the model is on GPU when training assert self.device == torch.device(f"cuda:{self.trainer.strategy.local_rank}") - self.start_cuda_memory = torch.cuda.memory_allocated() @hydra.main(config_path=None, version_base="1.1") def task_fn(cfg): From 4b6e46d993e7d39887718cfa8ba43b7f8ef632b2 Mon Sep 17 00:00:00 2001 From: Justin Goodwin Date: Thu, 22 Sep 2022 10:53:16 -0400 Subject: [PATCH 34/34] fixes failing test --- .../strategies/launchers/test_subprocess_script.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py index 0af30ac6712e9..d7cce93d5500d 100644 --- a/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py +++ b/tests/tests_pytorch/strategies/launchers/test_subprocess_script.py @@ -131,7 +131,7 @@ def test_ddp_with_hydra_multirunjob(cleandir, num_jobs): @RunIf(min_cuda_gpus=2, skip_windows=True, standalone=True) @pytest.mark.skipif(not _HYDRA_WITH_RERUN, reason=str(_HYDRA_WITH_RERUN)) @pytest.mark.parametrize("num_jobs", [1, 2]) -def test_ddp_with_hydra_multirunjob_rerun(num_jobs): +def test_ddp_with_hydra_multirunjob_rerun(cleandir, num_jobs): # Save script locally with open("temp.py", "w") as fn: fn.write(script)