From d156d8ad8a5f014faf53ed5e8dd3d81f7138a797 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 19 Dec 2020 21:05:44 +0000 Subject: [PATCH 01/38] hacking out --- pytorch_lightning/trainer/trainer.py | 2 ++ pytorch_lightning/trainer/training_loop.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 5a837956bc4ce..4ed282c077b1d 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -594,6 +594,8 @@ def run_evaluation(self, test_mode: bool = False, max_batches=None): if batch is None: continue + batch.to("cuda") + # stop short when running on limited batches if batch_idx >= dl_max_batches: break diff --git a/pytorch_lightning/trainer/training_loop.py b/pytorch_lightning/trainer/training_loop.py index 68a0f4781c9a9..59f0bf4a453fc 100644 --- a/pytorch_lightning/trainer/training_loop.py +++ b/pytorch_lightning/trainer/training_loop.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import os from contextlib import contextmanager from copy import copy, deepcopy @@ -551,6 +551,8 @@ def run_training_epoch(self): should_check_val = False for batch_idx, (batch, is_last_batch) in train_dataloader: + batch.to("cuda") + self.trainer.batch_idx = batch_idx # ------------------------------------ From 658dfcd1284429206d123ab3a9cb30dd731cfed7 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 19 Dec 2020 22:29:48 +0000 Subject: [PATCH 02/38] update --- pytorch_lightning/accelerators/ddp2_accelerator.py | 2 ++ pytorch_lightning/accelerators/ddp_accelerator.py | 2 ++ pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 2 ++ pytorch_lightning/accelerators/ddp_spawn_accelerator.py | 2 ++ pytorch_lightning/plugins/ddp_plugin.py | 2 +- pytorch_lightning/trainer/trainer.py | 2 -- pytorch_lightning/trainer/training_loop.py | 2 -- 7 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 2e864029f8767..d217b941f913b 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -113,6 +113,8 @@ def init_device(self, process_idx): def model_to_device(self, model): model.cuda(self.trainer.root_gpu) + for param in model.parameters(): + param = param.cuda(self.trainer.root_gpu) def get_device_ids(self): device_ids = self.trainer.data_parallel_device_ids diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index da9eb2d3ea937..c99deab26503c 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -197,6 +197,8 @@ def init_device(self, process_idx): def model_to_device(self, model): model.cuda(self.trainer.root_gpu) + for param in model.parameters(): + param = param.cuda(self.trainer.root_gpu) def get_device_ids(self): device_ids = [self.trainer.root_gpu] diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index b257884e34aef..2efc979839e81 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -74,6 +74,8 @@ def init_device(self, process_idx): def model_to_device(self, model): model.cuda(self.trainer.root_gpu) + for param in model.parameters(): + param = param.cuda(self.trainer.root_gpu) def get_device_ids(self): device_ids = [self.trainer.root_gpu] diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index a49e17fc0b31d..580a695278af8 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -202,6 +202,8 @@ def init_device(self, process_idx, is_master): def model_to_device(self, model): model.cuda(self.trainer.root_gpu) + for param in model.parameters(): + param = param.cuda(self.trainer.root_gpu) def get_device_ids(self): device_ids = [self.trainer.root_gpu] diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 281074cb37813..00aab32d1f9de 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -107,7 +107,7 @@ def on_before_forward(self, model, *args): model: Model to train. Returns: args moved to correct device if needed. """ - return args + return model.transfer_batch_to_device(args, model.trainer.root_gpu) def optimizer_state(self, optimizer: Optimizer) -> dict: return optimizer.state_dict() diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 4ed282c077b1d..5a837956bc4ce 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -594,8 +594,6 @@ def run_evaluation(self, test_mode: bool = False, max_batches=None): if batch is None: continue - batch.to("cuda") - # stop short when running on limited batches if batch_idx >= dl_max_batches: break diff --git a/pytorch_lightning/trainer/training_loop.py b/pytorch_lightning/trainer/training_loop.py index 59f0bf4a453fc..046371b9f6970 100644 --- a/pytorch_lightning/trainer/training_loop.py +++ b/pytorch_lightning/trainer/training_loop.py @@ -551,8 +551,6 @@ def run_training_epoch(self): should_check_val = False for batch_idx, (batch, is_last_batch) in train_dataloader: - batch.to("cuda") - self.trainer.batch_idx = batch_idx # ------------------------------------ From 526e9e64b4e5baa700c2dd0ff3884e60445d104f Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sun, 20 Dec 2020 18:41:35 +0000 Subject: [PATCH 03/38] remove useless on_before_forward --- pytorch_lightning/accelerators/accelerator.py | 3 +++ .../accelerators/ddp2_accelerator.py | 4 +--- .../accelerators/ddp_accelerator.py | 4 +--- .../accelerators/ddp_cpu_spawn_accelerator.py | 2 +- .../accelerators/ddp_hpc_accelerator.py | 4 +--- .../accelerators/ddp_spawn_accelerator.py | 4 +--- pytorch_lightning/plugins/ddp_plugin.py | 18 ------------------ pytorch_lightning/plugins/sharded_plugin.py | 3 --- pytorch_lightning/utilities/apply_func.py | 3 ++- 9 files changed, 10 insertions(+), 35 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index 77f30219ba8c0..85706556dc751 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -24,6 +24,8 @@ from pytorch_lightning.plugins.rpc_plugin import RPCPlugin from pytorch_lightning.utilities.apply_func import move_data_to_device from pytorch_lightning.utilities.parsing import AttributeDict +from pytorch_lightning.utilities.model_utils import is_overridden + if torch.distributed.is_available(): from torch.distributed import ReduceOp @@ -73,6 +75,7 @@ def batch_to_device(self, batch: Any, device: torch.device): model = self.trainer.get_model() if model is not None: return model.transfer_batch_to_device(batch, device) + print("HERE") return move_data_to_device(batch, device) def training_step_end(self, output): diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index d217b941f913b..487c19297f3d2 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -72,7 +72,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.batch_to_device(args, self.trainer.get_model().device) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -113,8 +113,6 @@ def init_device(self, process_idx): def model_to_device(self, model): model.cuda(self.trainer.root_gpu) - for param in model.parameters(): - param = param.cuda(self.trainer.root_gpu) def get_device_ids(self): device_ids = self.trainer.data_parallel_device_ids diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index c99deab26503c..9250704240064 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -164,7 +164,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.batch_to_device(args, self.trainer.get_model().device) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -197,8 +197,6 @@ def init_device(self, process_idx): def model_to_device(self, model): model.cuda(self.trainer.root_gpu) - for param in model.parameters(): - param = param.cuda(self.trainer.root_gpu) def get_device_ids(self): device_ids = [self.trainer.root_gpu] diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index 91a6dee484f30..11aa616e20e4e 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -185,7 +185,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.batch_to_device(args, self.trainer.get_model().device) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 2efc979839e81..2857ae34e11f9 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -74,8 +74,6 @@ def init_device(self, process_idx): def model_to_device(self, model): model.cuda(self.trainer.root_gpu) - for param in model.parameters(): - param = param.cuda(self.trainer.root_gpu) def get_device_ids(self): device_ids = [self.trainer.root_gpu] @@ -91,7 +89,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.batch_to_device(args, self.trainer.get_model().device) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index 580a695278af8..8b827b0e6e39d 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -202,8 +202,6 @@ def init_device(self, process_idx, is_master): def model_to_device(self, model): model.cuda(self.trainer.root_gpu) - for param in model.parameters(): - param = param.cuda(self.trainer.root_gpu) def get_device_ids(self): device_ids = [self.trainer.root_gpu] @@ -219,7 +217,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.batch_to_device(args, self.trainer.get_model().device) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 00aab32d1f9de..c13310d55a682 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -91,24 +91,6 @@ def init_ddp_connection( torch_backend, rank=global_rank, world_size=world_size ) - def on_before_forward(self, model: LightningModule, *args): - """ - Override to handle custom input to device logic. For DDP, no logic is required as this is handled internally - within the DDP wrapper. - - Example:: - - def on_before_forward(self, model, *args): - batch, batch_idx = args - return batch.to(model.device) - - Args: - args: Inputs to the model. - model: Model to train. - Returns: args moved to correct device if needed. - """ - return model.transfer_batch_to_device(args, model.trainer.root_gpu) - def optimizer_state(self, optimizer: Optimizer) -> dict: return optimizer.state_dict() diff --git a/pytorch_lightning/plugins/sharded_plugin.py b/pytorch_lightning/plugins/sharded_plugin.py index 937538561ccdd..32fc7984e90f9 100644 --- a/pytorch_lightning/plugins/sharded_plugin.py +++ b/pytorch_lightning/plugins/sharded_plugin.py @@ -42,9 +42,6 @@ def optimizer_state(self, optimizer: 'OSS') -> Optional[dict]: optimizer.consolidate_state_dict() return self._optim_state_dict(optimizer) - def on_before_forward(self, model: LightningModule, *args): - return model.transfer_batch_to_device(args, model.trainer.root_gpu) - def _check_fairscale(self): if not FAIRSCALE_AVAILABLE: raise MisconfigurationException( diff --git a/pytorch_lightning/utilities/apply_func.py b/pytorch_lightning/utilities/apply_func.py index 775c22dbbfa0a..5dd86df474068 100644 --- a/pytorch_lightning/utilities/apply_func.py +++ b/pytorch_lightning/utilities/apply_func.py @@ -108,6 +108,8 @@ def move_data_to_device(batch: Any, device: torch.device): """ def batch_to(data): + kwargs = dict(non_blocking=True) if isinstance(data, torch.Tensor) else {} + # try to move torchtext data first if TORCHTEXT_AVAILABLE and isinstance(data, Batch): @@ -120,7 +122,6 @@ def batch_to(data): setattr(device_data, field, device_field) return device_data - kwargs = dict(non_blocking=True) if isinstance(data, torch.Tensor) else {} return data.to(device, **kwargs) dtype = (TransferableDataType, Batch) if TORCHTEXT_AVAILABLE else TransferableDataType From 7396760637f60ef4f9bc343be79c9df513f21898 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sun, 20 Dec 2020 18:45:04 +0000 Subject: [PATCH 04/38] update --- pytorch_lightning/accelerators/accelerator.py | 1 - pytorch_lightning/utilities/apply_func.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index 85706556dc751..f247d9bdfa212 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -75,7 +75,6 @@ def batch_to_device(self, batch: Any, device: torch.device): model = self.trainer.get_model() if model is not None: return model.transfer_batch_to_device(batch, device) - print("HERE") return move_data_to_device(batch, device) def training_step_end(self, output): diff --git a/pytorch_lightning/utilities/apply_func.py b/pytorch_lightning/utilities/apply_func.py index 5dd86df474068..775c22dbbfa0a 100644 --- a/pytorch_lightning/utilities/apply_func.py +++ b/pytorch_lightning/utilities/apply_func.py @@ -108,8 +108,6 @@ def move_data_to_device(batch: Any, device: torch.device): """ def batch_to(data): - kwargs = dict(non_blocking=True) if isinstance(data, torch.Tensor) else {} - # try to move torchtext data first if TORCHTEXT_AVAILABLE and isinstance(data, Batch): @@ -122,6 +120,7 @@ def batch_to(data): setattr(device_data, field, device_field) return device_data + kwargs = dict(non_blocking=True) if isinstance(data, torch.Tensor) else {} return data.to(device, **kwargs) dtype = (TransferableDataType, Batch) if TORCHTEXT_AVAILABLE else TransferableDataType From dba7875bf880fe98051ae26d4abadfaad5e9e13b Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sun, 20 Dec 2020 18:47:11 +0000 Subject: [PATCH 05/38] remove overriden --- pytorch_lightning/accelerators/accelerator.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index f247d9bdfa212..77f30219ba8c0 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -24,8 +24,6 @@ from pytorch_lightning.plugins.rpc_plugin import RPCPlugin from pytorch_lightning.utilities.apply_func import move_data_to_device from pytorch_lightning.utilities.parsing import AttributeDict -from pytorch_lightning.utilities.model_utils import is_overridden - if torch.distributed.is_available(): from torch.distributed import ReduceOp From 895dc72471375252048ee6a88e9a5ba877cf23d1 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sun, 20 Dec 2020 18:49:52 +0000 Subject: [PATCH 06/38] iremove os --- pytorch_lightning/trainer/training_loop.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pytorch_lightning/trainer/training_loop.py b/pytorch_lightning/trainer/training_loop.py index 046371b9f6970..e38eeeee4b489 100644 --- a/pytorch_lightning/trainer/training_loop.py +++ b/pytorch_lightning/trainer/training_loop.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os from contextlib import contextmanager from copy import copy, deepcopy From ed0eb42bfd80e6d77bd352b176a663fff9aeabfe Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 21 Dec 2020 08:46:41 +0000 Subject: [PATCH 07/38] use on_before_forward --- .../accelerators/ddp2_accelerator.py | 2 +- .../accelerators/ddp_accelerator.py | 2 +- .../accelerators/ddp_cpu_spawn_accelerator.py | 2 +- .../accelerators/ddp_hpc_accelerator.py | 2 +- .../accelerators/ddp_spawn_accelerator.py | 2 +- pytorch_lightning/plugins/ddp_plugin.py | 21 ++++++++++++++++++- 6 files changed, 25 insertions(+), 6 deletions(-) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 487c19297f3d2..2e864029f8767 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -72,7 +72,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.batch_to_device(args, self.trainer.get_model().device) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index 9250704240064..da9eb2d3ea937 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -164,7 +164,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.batch_to_device(args, self.trainer.get_model().device) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index 11aa616e20e4e..91a6dee484f30 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -185,7 +185,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.batch_to_device(args, self.trainer.get_model().device) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 2857ae34e11f9..b257884e34aef 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -89,7 +89,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.batch_to_device(args, self.trainer.get_model().device) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index 8b827b0e6e39d..a49e17fc0b31d 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -217,7 +217,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.batch_to_device(args, self.trainer.get_model().device) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index c13310d55a682..2cec08d783019 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -2,6 +2,7 @@ from contextlib import contextmanager from typing import Any, Dict, List, Optional, Union +import torch import torch.distributed as torch_distrib from torch.optim import Optimizer @@ -91,6 +92,24 @@ def init_ddp_connection( torch_backend, rank=global_rank, world_size=world_size ) + def on_before_forward(self, model: LightningModule, *batch): + """ + Override to handle custom input to device logic. For DDP, no logic is required as this is handled internally + within the DDP wrapper. + + Example:: + + def on_before_forward(self, model, *batch): + batch, batch_idx = batch + return batch.to(model.device) + + Args: + batch: Inputs to the model. + model: Model to train. + Returns: batch moved to correct device if needed. + """ + return model.transfer_batch_to_device(batch, model.device) + def optimizer_state(self, optimizer: Optimizer) -> dict: return optimizer.state_dict() @@ -147,4 +166,4 @@ def data_parallel_group(self): Useful for when additional parallel groups have been created, to select certain processes. Returns: The ProcessGroup this process exists in. """ - return torch_distrib.group.WORLD + return torch_distrib.group.WORLD \ No newline at end of file From 31587f976c28b55de32e18405dff089a4d022cf1 Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 23 Dec 2020 09:11:14 +0100 Subject: [PATCH 08/38] resolve flake8 --- pytorch_lightning/plugins/ddp_plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 2cec08d783019..da35ecb94f3cb 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -1,5 +1,5 @@ -import os from contextlib import contextmanager +import os from typing import Any, Dict, List, Optional, Union import torch @@ -166,4 +166,4 @@ def data_parallel_group(self): Useful for when additional parallel groups have been created, to select certain processes. Returns: The ProcessGroup this process exists in. """ - return torch_distrib.group.WORLD \ No newline at end of file + return torch_distrib.group.WORLD From d81a7d0cd668c81c2e535de8d09cc8e397ed6027 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 4 Jan 2021 12:12:07 +0000 Subject: [PATCH 09/38] add test --- tests/models/test_hooks.py | 48 ++++++++++++++++++++++++++++++++++++-- tests/special_tests.sh | 1 + 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/tests/models/test_hooks.py b/tests/models/test_hooks.py index f3af5b745a380..6743a1e7c7038 100644 --- a/tests/models/test_hooks.py +++ b/tests/models/test_hooks.py @@ -12,14 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import inspect - +import os import pytest import torch from unittest.mock import MagicMock from pytorch_lightning import Trainer from pytorch_lightning.accelerators.gpu_accelerator import GPUAccelerator -from tests.base import EvalModelTemplate, BoringModel +from tests.base import EvalModelTemplate, BoringModel, RandomDataset @pytest.mark.parametrize('max_steps', [1, 2, 3]) @@ -124,6 +124,50 @@ def transfer_batch_to_device(self, data, device): assert batch_gpu.samples.device == batch_gpu.targets.device == expected +@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="test requires multi-GPU machine") +@pytest.mark.skipif(not os.getenv("PL_RUNNING_SPECIAL_TESTS", '0') == '1', + reason="test should be run outside of pytest") +def test_transfer_batch_hook_ddp(tmpdir): + """ + Test custom data are properly moved to the right device using ddp + """ + + class CustomBatch: + + def __init__(self, data): + self.samples = data[0] + + def to(self, device, **kwargs): + self.samples = self.samples.to(device, **kwargs) + return self + + def collate_fn(batch): + return CustomBatch(batch) + + class TestModel(BoringModel): + def training_step(self, batch, batch_idx): + assert batch.samples.device == self.device + assert isinstance(batch_idx, int) + + def train_dataloader(self): + return torch.utils.data.DataLoader(RandomDataset(32, 64), collate_fn=collate_fn) + + + model = TestModel() + model.validation_step = None + model.training_epoch_end = None + trainer = Trainer( + default_root_dir=tmpdir, + limit_train_batches=1, + limit_val_batches=1, + max_epochs=1, + weights_summary=None, + accelerator="ddp", + gpus=2, + ) + trainer.fit(model) + + @pytest.mark.parametrize( 'max_epochs,batch_idx_', [(2, 5), (3, 8), (4, 12)] diff --git a/tests/special_tests.sh b/tests/special_tests.sh index 950e3776bbc7f..a76f7ff877376 100644 --- a/tests/special_tests.sh +++ b/tests/special_tests.sh @@ -20,3 +20,4 @@ python ${DEFAULTS} tests/plugins/test_ddp_sequential_plugin.py::test_ddp_sequent python ${DEFAULTS} tests/plugins/test_ddp_sequential_plugin.py::test_ddp_sequential_plugin_ddp_rpc_manual_amp python ${DEFAULTS} tests/plugins/test_ddp_sequential_plugin.py::test_ddp_sequential_plugin_ddp_rpc_automatic python ${DEFAULTS} tests/trainer/logging_tests/test_train_loop_logging_1_0.py::test_logging_sync_dist_true_ddp +python ${DEFAULTS} tests/models/test_hooks.py::test_transfer_batch_hook_ddp From ddd7a1cbaa5d254de91f8f1785b29d951b7af454 Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 13:14:10 +0100 Subject: [PATCH 10/38] update --- tests/models/test_hooks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/models/test_hooks.py b/tests/models/test_hooks.py index 6743a1e7c7038..c993b3b3e50b2 100644 --- a/tests/models/test_hooks.py +++ b/tests/models/test_hooks.py @@ -13,13 +13,14 @@ # limitations under the License. import inspect import os +from unittest.mock import MagicMock + import pytest import torch -from unittest.mock import MagicMock from pytorch_lightning import Trainer from pytorch_lightning.accelerators.gpu_accelerator import GPUAccelerator -from tests.base import EvalModelTemplate, BoringModel, RandomDataset +from tests.base import BoringModel, EvalModelTemplate, RandomDataset @pytest.mark.parametrize('max_steps', [1, 2, 3]) @@ -129,7 +130,7 @@ def transfer_batch_to_device(self, data, device): reason="test should be run outside of pytest") def test_transfer_batch_hook_ddp(tmpdir): """ - Test custom data are properly moved to the right device using ddp + Test custom data are properly moved to the right device using ddp """ class CustomBatch: @@ -152,7 +153,6 @@ def training_step(self, batch, batch_idx): def train_dataloader(self): return torch.utils.data.DataLoader(RandomDataset(32, 64), collate_fn=collate_fn) - model = TestModel() model.validation_step = None model.training_epoch_end = None From f7e30be8849ec3a7a3ede8ba451b4c6289e224eb Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 4 Jan 2021 13:40:48 +0000 Subject: [PATCH 11/38] add single_process_per_device --- pytorch_lightning/accelerators/accelerator.py | 4 ++++ pytorch_lightning/accelerators/ddp2_accelerator.py | 6 +++++- pytorch_lightning/accelerators/ddp_accelerator.py | 6 +++++- pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py | 4 ++++ pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py | 6 +++++- pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 6 +++++- pytorch_lightning/accelerators/ddp_spawn_accelerator.py | 6 +++++- pytorch_lightning/accelerators/dp_accelerator.py | 3 +++ pytorch_lightning/plugins/ddp_plugin.py | 6 ++++-- 9 files changed, 40 insertions(+), 7 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index 77f30219ba8c0..b387d7b0f29f7 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -237,6 +237,10 @@ def __setstate__(self, d): def on_save(self, checkpoint): return checkpoint + @property + def single_process_per_device(self): + raise NotImplementedError + @property def rpc_enabled(self): return self.ddp_plugin is not None and isinstance(self.ddp_plugin, RPCPlugin) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 2e864029f8767..6adebf14ef544 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -72,7 +72,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -269,3 +269,7 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): return True + + @property + def single_process_per_device(self): + return False diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index da9eb2d3ea937..8cef7ee3c6ece 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -164,7 +164,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -373,3 +373,7 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): return True + + @property + def single_process_per_device(self): + return True diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index b9a71ed271744..e849510470e21 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -51,3 +51,7 @@ def get_device_ids(self): def init_device(self, process_idx): pass + + @property + def single_process_per_device(self): + return True \ No newline at end of file diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index 91a6dee484f30..ef2a2e6d21617 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -185,7 +185,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -297,3 +297,7 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): return True + + @property + def single_process_per_device(self): + return True diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index b257884e34aef..547fcac6abd64 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -89,7 +89,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -261,3 +261,7 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): return True + + @property + def single_process_per_device(self): + return True diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index a49e17fc0b31d..a3451c277e1d8 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -217,7 +217,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -329,3 +329,7 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): return True + + @property + def single_process_per_device(self): + return True diff --git a/pytorch_lightning/accelerators/dp_accelerator.py b/pytorch_lightning/accelerators/dp_accelerator.py index 834a920b505d9..ee2568c72d98d 100644 --- a/pytorch_lightning/accelerators/dp_accelerator.py +++ b/pytorch_lightning/accelerators/dp_accelerator.py @@ -188,3 +188,6 @@ def get_reference_model(self, model) -> LightningModule: @property def require_distributed_sampler(self): return False + + def single_process_per_device(self): + return False diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index da35ecb94f3cb..ae50f2a334ded 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -92,7 +92,7 @@ def init_ddp_connection( torch_backend, rank=global_rank, world_size=world_size ) - def on_before_forward(self, model: LightningModule, *batch): + def on_before_forward(self, model: LightningModule, single_process_per_device, *batch): """ Override to handle custom input to device logic. For DDP, no logic is required as this is handled internally within the DDP wrapper. @@ -108,7 +108,9 @@ def on_before_forward(self, model, *batch): model: Model to train. Returns: batch moved to correct device if needed. """ - return model.transfer_batch_to_device(batch, model.device) + if single_process_per_device: + return model.transfer_batch_to_device(batch, model.device) + return batch def optimizer_state(self, optimizer: Optimizer) -> dict: return optimizer.state_dict() From a7e4a410a89299275f0e1e53ea573b8650c8d4fd Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 14:56:24 +0100 Subject: [PATCH 12/38] resolve flake8 --- pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index e849510470e21..2a6ea830b5d6d 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -54,4 +54,4 @@ def init_device(self, process_idx): @property def single_process_per_device(self): - return True \ No newline at end of file + return True From 223ab0ebf44a573944cde87322eea8df60750056 Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 15:00:45 +0100 Subject: [PATCH 13/38] update --- pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py | 2 +- pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py | 4 ++-- pytorch_lightning/plugins/ddp_plugin.py | 6 ++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index 2a6ea830b5d6d..7457587583dea 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -54,4 +54,4 @@ def init_device(self, process_idx): @property def single_process_per_device(self): - return True + return False diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index ef2a2e6d21617..f92dafa0a4100 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -26,7 +26,7 @@ from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.plugins.ddp_plugin import DDPPlugin from pytorch_lightning.plugins.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType +from pytorch_lightning.utilities import AMPType, HYDRA_AVAILABLE from pytorch_lightning.utilities.distributed import ( all_gather_ddp_if_available, find_free_network_port, @@ -300,4 +300,4 @@ def require_distributed_sampler(self): @property def single_process_per_device(self): - return True + return False diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index ae50f2a334ded..2499f45f27e13 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -1,6 +1,6 @@ -from contextlib import contextmanager import os -from typing import Any, Dict, List, Optional, Union +from contextlib import contextmanager +from typing import Any, Dict, List, Union import torch import torch.distributed as torch_distrib @@ -105,6 +105,8 @@ def on_before_forward(self, model, *batch): Args: batch: Inputs to the model. + single_process_per_device (bool): Wheter the accelerator requieres to move each batch data to a single gpu. + This should be skipped on 'cpu' and 'dp', 'ddp2' modes. model: Model to train. Returns: batch moved to correct device if needed. """ From fd1b63e3392a8905e2f0710af897dffc3a38fb85 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 4 Jan 2021 14:28:12 +0000 Subject: [PATCH 14/38] resolve --- pytorch_lightning/accelerators/accelerator.py | 4 ---- pytorch_lightning/accelerators/ddp2_accelerator.py | 8 ++------ pytorch_lightning/accelerators/ddp_accelerator.py | 6 +----- .../accelerators/ddp_cpu_hpc_accelerator.py | 3 --- .../accelerators/ddp_cpu_spawn_accelerator.py | 6 +----- pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 6 +----- pytorch_lightning/accelerators/ddp_spawn_accelerator.py | 6 +----- pytorch_lightning/accelerators/dp_accelerator.py | 3 --- pytorch_lightning/plugins/ddp_plugin.py | 9 ++++----- 9 files changed, 10 insertions(+), 41 deletions(-) diff --git a/pytorch_lightning/accelerators/accelerator.py b/pytorch_lightning/accelerators/accelerator.py index b387d7b0f29f7..77f30219ba8c0 100644 --- a/pytorch_lightning/accelerators/accelerator.py +++ b/pytorch_lightning/accelerators/accelerator.py @@ -237,10 +237,6 @@ def __setstate__(self, d): def on_save(self, checkpoint): return checkpoint - @property - def single_process_per_device(self): - raise NotImplementedError - @property def rpc_enabled(self): return self.ddp_plugin is not None and isinstance(self.ddp_plugin, RPCPlugin) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 6adebf14ef544..27641d9418479 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -72,7 +72,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -268,8 +268,4 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): - return True - - @property - def single_process_per_device(self): - return False + return True \ No newline at end of file diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index 8cef7ee3c6ece..4299b4d223058 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -164,7 +164,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -373,7 +373,3 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): return True - - @property - def single_process_per_device(self): - return True diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index 7457587583dea..443f428ec12eb 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -52,6 +52,3 @@ def get_device_ids(self): def init_device(self, process_idx): pass - @property - def single_process_per_device(self): - return False diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index f92dafa0a4100..de7b93511507c 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -185,7 +185,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -297,7 +297,3 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): return True - - @property - def single_process_per_device(self): - return False diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 547fcac6abd64..b257884e34aef 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -89,7 +89,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -261,7 +261,3 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): return True - - @property - def single_process_per_device(self): - return True diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index a3451c277e1d8..a49e17fc0b31d 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -217,7 +217,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), self.single_process_per_device, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) @@ -329,7 +329,3 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): return True - - @property - def single_process_per_device(self): - return True diff --git a/pytorch_lightning/accelerators/dp_accelerator.py b/pytorch_lightning/accelerators/dp_accelerator.py index ee2568c72d98d..834a920b505d9 100644 --- a/pytorch_lightning/accelerators/dp_accelerator.py +++ b/pytorch_lightning/accelerators/dp_accelerator.py @@ -188,6 +188,3 @@ def get_reference_model(self, model) -> LightningModule: @property def require_distributed_sampler(self): return False - - def single_process_per_device(self): - return False diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 2499f45f27e13..589034a42190f 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -92,7 +92,7 @@ def init_ddp_connection( torch_backend, rank=global_rank, world_size=world_size ) - def on_before_forward(self, model: LightningModule, single_process_per_device, *batch): + def on_before_forward(self, model: LightningDistributedDataParallel, *batch): """ Override to handle custom input to device logic. For DDP, no logic is required as this is handled internally within the DDP wrapper. @@ -105,13 +105,12 @@ def on_before_forward(self, model, *batch): Args: batch: Inputs to the model. - single_process_per_device (bool): Wheter the accelerator requieres to move each batch data to a single gpu. - This should be skipped on 'cpu' and 'dp', 'ddp2' modes. model: Model to train. Returns: batch moved to correct device if needed. """ - if single_process_per_device: - return model.transfer_batch_to_device(batch, model.device) + if model.device_ids is not None and len(model.device_ids) == 1: + if isinstance(model, LightningDistributedDataParallel): + batch = model.module.transfer_batch_to_device(batch, model.module.device) return batch def optimizer_state(self, optimizer: Optimizer) -> dict: From 5d60b49b9682d7bfe3081e606585b888c8913ca5 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 4 Jan 2021 15:27:07 +0000 Subject: [PATCH 15/38] update --- pytorch_lightning/accelerators/ddp2_accelerator.py | 2 +- pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py | 2 +- pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 2 +- pytorch_lightning/accelerators/ddp_spawn_accelerator.py | 2 +- pytorch_lightning/overrides/data_parallel.py | 5 +++++ pytorch_lightning/plugins/ddp_plugin.py | 6 +++--- 6 files changed, 12 insertions(+), 7 deletions(-) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 27641d9418479..4a1235f9b72fd 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -72,7 +72,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index de7b93511507c..aa89f8b60c9ed 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -185,7 +185,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index b257884e34aef..dc282cb844d10 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -89,7 +89,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index a49e17fc0b31d..d8c50e1a59fa9 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -217,7 +217,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) + args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/overrides/data_parallel.py b/pytorch_lightning/overrides/data_parallel.py index 393138fff9248..0a4d41893990b 100644 --- a/pytorch_lightning/overrides/data_parallel.py +++ b/pytorch_lightning/overrides/data_parallel.py @@ -220,6 +220,11 @@ def reducer_prepare_for_backwards(self, output): def reducer_reset_hooks(self): self._reducer_prepared_for_backwards = False + @property + def running_single_process_per_device(self): + device_ids = getattr(self, "device_ids", None) + return device_ids is not None and len(device_ids) == 1 + def warn_missing_output(fx_called): if fx_called == 'training_step': diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 589034a42190f..bac002b685c1c 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -108,9 +108,9 @@ def on_before_forward(self, model, *batch): model: Model to train. Returns: batch moved to correct device if needed. """ - if model.device_ids is not None and len(model.device_ids) == 1: - if isinstance(model, LightningDistributedDataParallel): - batch = model.module.transfer_batch_to_device(batch, model.module.device) + if model.running_single_process_per_device: + model = self.get_model_from_plugin(model) + batch = model.transfer_batch_to_device(batch, model.device) return batch def optimizer_state(self, optimizer: Optimizer) -> dict: From 7cc7760fb885e2c8442a2d6cfb1b41fe9fca560a Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 16:29:27 +0100 Subject: [PATCH 16/38] update --- pytorch_lightning/accelerators/ddp2_accelerator.py | 4 ++-- pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 4a1235f9b72fd..8a4e2bf944c58 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -26,7 +26,7 @@ from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.plugins.ddp_plugin import DDPPlugin from pytorch_lightning.plugins.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType +from pytorch_lightning.utilities import AMPType, HYDRA_AVAILABLE from pytorch_lightning.utilities.distributed import all_gather_ddp_if_available, rank_zero_only, sync_ddp_if_available if HYDRA_AVAILABLE: @@ -268,4 +268,4 @@ def distributed_sampler_kwargs(self): @property def require_distributed_sampler(self): - return True \ No newline at end of file + return True diff --git a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py index 443f428ec12eb..b9a71ed271744 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_hpc_accelerator.py @@ -51,4 +51,3 @@ def get_device_ids(self): def init_device(self, process_idx): pass - From f5c28438bc0c2cbb72cf3d6079e814f365d9fabf Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 16:31:55 +0100 Subject: [PATCH 17/38] update --- pytorch_lightning/overrides/data_parallel.py | 2 +- pytorch_lightning/plugins/ddp_plugin.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/overrides/data_parallel.py b/pytorch_lightning/overrides/data_parallel.py index 0a4d41893990b..6e5f438b9f324 100644 --- a/pytorch_lightning/overrides/data_parallel.py +++ b/pytorch_lightning/overrides/data_parallel.py @@ -221,7 +221,7 @@ def reducer_reset_hooks(self): self._reducer_prepared_for_backwards = False @property - def running_single_process_per_device(self): + def running_single_process_per_device(self) -> bool: device_ids = getattr(self, "device_ids", None) return device_ids is not None and len(device_ids) == 1 diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index bac002b685c1c..759b67ddd0cb5 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -108,7 +108,7 @@ def on_before_forward(self, model, *batch): model: Model to train. Returns: batch moved to correct device if needed. """ - if model.running_single_process_per_device: + if isinstance(model, LightningDistributedDataParallel) and model.running_single_process_per_device: model = self.get_model_from_plugin(model) batch = model.transfer_batch_to_device(batch, model.device) return batch From 70cde1d323d38242f4f70a8e4e1fd0e61774e516 Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 16:35:15 +0100 Subject: [PATCH 18/38] add comment --- pytorch_lightning/plugins/ddp_plugin.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 759b67ddd0cb5..23003d874c9fc 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -109,6 +109,8 @@ def on_before_forward(self, model, *batch): Returns: batch moved to correct device if needed. """ if isinstance(model, LightningDistributedDataParallel) and model.running_single_process_per_device: + # # when using single process/single device move all objects to appropriate device as no scatter is necessary + # works with custom batch object if they implement `.to` function model = self.get_model_from_plugin(model) batch = model.transfer_batch_to_device(batch, model.device) return batch From 0c7690db47fb00882098fd4fd4f9f393498fa172 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 4 Jan 2021 16:43:53 +0000 Subject: [PATCH 19/38] resolve bug with sharded --- pytorch_lightning/plugins/ddp_plugin.py | 13 +++++++++---- pytorch_lightning/plugins/sharded_plugin.py | 4 +++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 759b67ddd0cb5..16ea1671dfe4a 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -48,7 +48,7 @@ def configure_ddp( def configure_ddp(self, model, device_ids): model = LightningDistributedDataParallel( - model, device_ids=device_ids, find_unused_parameters=True + model, device_ids=device_ids, find_unused_parameters=False ) return model @@ -60,9 +60,9 @@ def configure_ddp(self, model, device_ids): the model wrapped in LightningDistributedDataParallel """ - # if unset, default `find_unused_parameters` `True` + # if unset, default `find_unused_parameters` `False` self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get( - "find_unused_parameters", True + "find_unused_parameters", False ) model = LightningDistributedDataParallel( model, @@ -92,6 +92,11 @@ def init_ddp_connection( torch_backend, rank=global_rank, world_size=world_size ) + @staticmethod + def is_running_single_process_per_device(model) -> bool: + device_ids = getattr(model, "device_ids", None) + return device_ids is not None and len(device_ids) == 1 + def on_before_forward(self, model: LightningDistributedDataParallel, *batch): """ Override to handle custom input to device logic. For DDP, no logic is required as this is handled internally @@ -108,7 +113,7 @@ def on_before_forward(self, model, *batch): model: Model to train. Returns: batch moved to correct device if needed. """ - if isinstance(model, LightningDistributedDataParallel) and model.running_single_process_per_device: + if self.is_running_single_process_per_device(model): model = self.get_model_from_plugin(model) batch = model.transfer_batch_to_device(batch, model.device) return batch diff --git a/pytorch_lightning/plugins/sharded_plugin.py b/pytorch_lightning/plugins/sharded_plugin.py index 32fc7984e90f9..ccf8d6dda2919 100644 --- a/pytorch_lightning/plugins/sharded_plugin.py +++ b/pytorch_lightning/plugins/sharded_plugin.py @@ -36,7 +36,9 @@ def configure_ddp( self, model: LightningModule, device_ids: List[int] ): self._wrap_optimizers(model) - return LightningShardedDataParallel(model, sharded_optimizer=model.trainer.optimizers) + model = LightningShardedDataParallel(model, sharded_optimizer=model.trainer.optimizers) + model.device_ids = device_ids + return model def optimizer_state(self, optimizer: 'OSS') -> Optional[dict]: optimizer.consolidate_state_dict() From 2506d3cf742b79145bf7e88d21854d4b0e37d0b7 Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 17:45:49 +0100 Subject: [PATCH 20/38] update --- pytorch_lightning/plugins/ddp_plugin.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index abb6a43eb0dbc..16ea1671dfe4a 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -113,13 +113,7 @@ def on_before_forward(self, model, *batch): model: Model to train. Returns: batch moved to correct device if needed. """ -<<<<<<< HEAD if self.is_running_single_process_per_device(model): -======= - if isinstance(model, LightningDistributedDataParallel) and model.running_single_process_per_device: - # # when using single process/single device move all objects to appropriate device as no scatter is necessary - # works with custom batch object if they implement `.to` function ->>>>>>> 70cde1d323d38242f4f70a8e4e1fd0e61774e516 model = self.get_model_from_plugin(model) batch = model.transfer_batch_to_device(batch, model.device) return batch From cdede9a9685768cdc75906632e4621af6a273f81 Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 17:51:14 +0100 Subject: [PATCH 21/38] remove property --- pytorch_lightning/overrides/data_parallel.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pytorch_lightning/overrides/data_parallel.py b/pytorch_lightning/overrides/data_parallel.py index 6e5f438b9f324..393138fff9248 100644 --- a/pytorch_lightning/overrides/data_parallel.py +++ b/pytorch_lightning/overrides/data_parallel.py @@ -220,11 +220,6 @@ def reducer_prepare_for_backwards(self, output): def reducer_reset_hooks(self): self._reducer_prepared_for_backwards = False - @property - def running_single_process_per_device(self) -> bool: - device_ids = getattr(self, "device_ids", None) - return device_ids is not None and len(device_ids) == 1 - def warn_missing_output(fx_called): if fx_called == 'training_step': From f38232bb209e8cbd78c42ce51e11a4a6e4ccddd5 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 4 Jan 2021 18:21:57 +0000 Subject: [PATCH 22/38] update --- pytorch_lightning/plugins/ddp_plugin.py | 12 +++--------- pytorch_lightning/plugins/sharded_plugin.py | 1 - tests/models/test_sync_batchnorm.py | 3 +++ 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index abb6a43eb0dbc..e98fdae4ca0c8 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -69,6 +69,7 @@ def configure_ddp(self, model, device_ids): device_ids=device_ids, **self._ddp_kwargs, ) + self.device_ids = device_ids return model def init_ddp_connection( @@ -93,9 +94,8 @@ def init_ddp_connection( ) @staticmethod - def is_running_single_process_per_device(model) -> bool: - device_ids = getattr(model, "device_ids", None) - return device_ids is not None and len(device_ids) == 1 + def is_running_single_process_per_device(self) -> bool: + return self.device_ids is not None and len(self.device_ids) == 1 def on_before_forward(self, model: LightningDistributedDataParallel, *batch): """ @@ -113,13 +113,7 @@ def on_before_forward(self, model, *batch): model: Model to train. Returns: batch moved to correct device if needed. """ -<<<<<<< HEAD if self.is_running_single_process_per_device(model): -======= - if isinstance(model, LightningDistributedDataParallel) and model.running_single_process_per_device: - # # when using single process/single device move all objects to appropriate device as no scatter is necessary - # works with custom batch object if they implement `.to` function ->>>>>>> 70cde1d323d38242f4f70a8e4e1fd0e61774e516 model = self.get_model_from_plugin(model) batch = model.transfer_batch_to_device(batch, model.device) return batch diff --git a/pytorch_lightning/plugins/sharded_plugin.py b/pytorch_lightning/plugins/sharded_plugin.py index ccf8d6dda2919..70dd8adea4cee 100644 --- a/pytorch_lightning/plugins/sharded_plugin.py +++ b/pytorch_lightning/plugins/sharded_plugin.py @@ -37,7 +37,6 @@ def configure_ddp( ): self._wrap_optimizers(model) model = LightningShardedDataParallel(model, sharded_optimizer=model.trainer.optimizers) - model.device_ids = device_ids return model def optimizer_state(self, optimizer: 'OSS') -> Optional[dict]: diff --git a/tests/models/test_sync_batchnorm.py b/tests/models/test_sync_batchnorm.py index fd771c98635ab..6be349a177a5a 100644 --- a/tests/models/test_sync_batchnorm.py +++ b/tests/models/test_sync_batchnorm.py @@ -19,10 +19,12 @@ from pytorch_lightning import Trainer, seed_everything, LightningModule from pytorch_lightning.core.step_result import TrainResult from pytorch_lightning.utilities import FLOAT16_EPSILON +from pytorch_lightning.plugins.ddp_plugin import DDPPlugin from tests.base.datamodules import MNISTDataModule from tests.base.develop_utils import set_random_master_port + class SyncBNModule(LightningModule): def __init__(self, gpu_count=1, **kwargs): super().__init__() @@ -108,6 +110,7 @@ def test_sync_batchnorm_ddp(tmpdir): sync_batchnorm=True, num_sanity_val_steps=0, replace_sampler_ddp=False, + plugins=[DDPPlugin(find_unused_parameters=True)] ) result = trainer.fit(model, dm) From 55274555064d44a48234bb29874e7d8380678edb Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 19:24:55 +0100 Subject: [PATCH 23/38] resolve test --- tests/models/test_sync_batchnorm.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/models/test_sync_batchnorm.py b/tests/models/test_sync_batchnorm.py index 6be349a177a5a..dd54c6b5d654e 100644 --- a/tests/models/test_sync_batchnorm.py +++ b/tests/models/test_sync_batchnorm.py @@ -16,15 +16,14 @@ import torch.nn as nn import torch.nn.functional as F -from pytorch_lightning import Trainer, seed_everything, LightningModule +from pytorch_lightning import LightningModule, seed_everything, Trainer from pytorch_lightning.core.step_result import TrainResult -from pytorch_lightning.utilities import FLOAT16_EPSILON from pytorch_lightning.plugins.ddp_plugin import DDPPlugin +from pytorch_lightning.utilities import FLOAT16_EPSILON from tests.base.datamodules import MNISTDataModule from tests.base.develop_utils import set_random_master_port - class SyncBNModule(LightningModule): def __init__(self, gpu_count=1, **kwargs): super().__init__() From bf94fa302cb86e60770d1a7c211ddcd3257a4a14 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 4 Jan 2021 18:45:06 +0000 Subject: [PATCH 24/38] resolve bug --- pytorch_lightning/plugins/ddp_plugin.py | 4 ++-- pytorch_lightning/plugins/sharded_plugin.py | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index e98fdae4ca0c8..6edaf04bc1bd8 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -93,7 +93,7 @@ def init_ddp_connection( torch_backend, rank=global_rank, world_size=world_size ) - @staticmethod + @property def is_running_single_process_per_device(self) -> bool: return self.device_ids is not None and len(self.device_ids) == 1 @@ -113,7 +113,7 @@ def on_before_forward(self, model, *batch): model: Model to train. Returns: batch moved to correct device if needed. """ - if self.is_running_single_process_per_device(model): + if self.is_running_single_process_per_device: model = self.get_model_from_plugin(model) batch = model.transfer_batch_to_device(batch, model.device) return batch diff --git a/pytorch_lightning/plugins/sharded_plugin.py b/pytorch_lightning/plugins/sharded_plugin.py index 70dd8adea4cee..c9951c49f53fb 100644 --- a/pytorch_lightning/plugins/sharded_plugin.py +++ b/pytorch_lightning/plugins/sharded_plugin.py @@ -35,6 +35,7 @@ def __init__(self, **kwargs): def configure_ddp( self, model: LightningModule, device_ids: List[int] ): + self.device_ids = device_ids self._wrap_optimizers(model) model = LightningShardedDataParallel(model, sharded_optimizer=model.trainer.optimizers) return model From b23d5ba94abbfa695051dde7fb3538c821856ebc Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 21:03:17 +0100 Subject: [PATCH 25/38] update on comments --- pytorch_lightning/accelerators/ddp2_accelerator.py | 2 +- pytorch_lightning/accelerators/ddp_accelerator.py | 4 ++-- pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py | 2 +- pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 4 ++-- pytorch_lightning/accelerators/ddp_spawn_accelerator.py | 4 ++-- pytorch_lightning/plugins/ddp_plugin.py | 5 +++-- 6 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index 8a4e2bf944c58..fe4133cb52624 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -72,7 +72,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index 4299b4d223058..a0e7f7b113b8b 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -30,7 +30,7 @@ from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.plugins.ddp_plugin import DDPPlugin from pytorch_lightning.plugins.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType +from pytorch_lightning.utilities import AMPType, HYDRA_AVAILABLE from pytorch_lightning.utilities.distributed import ( all_gather_ddp_if_available, find_free_network_port, @@ -164,7 +164,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index aa89f8b60c9ed..de7b93511507c 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -185,7 +185,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index dc282cb844d10..d03e899cba5d6 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -26,7 +26,7 @@ from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.plugins.ddp_plugin import DDPPlugin from pytorch_lightning.plugins.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType +from pytorch_lightning.utilities import AMPType, HYDRA_AVAILABLE from pytorch_lightning.utilities.distributed import all_gather_ddp_if_available, rank_zero_only, sync_ddp_if_available if HYDRA_AVAILABLE: @@ -89,7 +89,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index d8c50e1a59fa9..3ba5268eaf056 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -27,7 +27,7 @@ from pytorch_lightning.distributed import LightningDistributed from pytorch_lightning.plugins.ddp_plugin import DDPPlugin from pytorch_lightning.plugins.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import HYDRA_AVAILABLE, AMPType +from pytorch_lightning.utilities import AMPType, HYDRA_AVAILABLE from pytorch_lightning.utilities.cloud_io import atomic_save from pytorch_lightning.utilities.cloud_io import load as pl_load from pytorch_lightning.utilities.distributed import ( @@ -217,7 +217,7 @@ def test_step(self, args): return self._step(args) def _step(self, args): - args = self.ddp_plugin.on_before_forward(self.trainer.model, *args) + args = self.ddp_plugin.on_before_forward(self.trainer.get_model(), *args) if self.trainer.amp_backend == AMPType.NATIVE: with torch.cuda.amp.autocast(): output = self.trainer.model(*args) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 6edaf04bc1bd8..7643e6c76e236 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -95,9 +95,11 @@ def init_ddp_connection( @property def is_running_single_process_per_device(self) -> bool: + # objects do not need to be scattered in single process per device, move objects upfront to device + # used in `self.on_before_forward` return self.device_ids is not None and len(self.device_ids) == 1 - def on_before_forward(self, model: LightningDistributedDataParallel, *batch): + def on_before_forward(self, model: LightningModule, *batch): """ Override to handle custom input to device logic. For DDP, no logic is required as this is handled internally within the DDP wrapper. @@ -114,7 +116,6 @@ def on_before_forward(self, model, *batch): Returns: batch moved to correct device if needed. """ if self.is_running_single_process_per_device: - model = self.get_model_from_plugin(model) batch = model.transfer_batch_to_device(batch, model.device) return batch From c33746120272e5a5cb5ffb37fc4a85290ae1a7b7 Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 4 Jan 2021 21:36:35 +0100 Subject: [PATCH 26/38] update doc --- pytorch_lightning/core/hooks.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pytorch_lightning/core/hooks.py b/pytorch_lightning/core/hooks.py index f24a4ce8beb8a..086ce244146e1 100644 --- a/pytorch_lightning/core/hooks.py +++ b/pytorch_lightning/core/hooks.py @@ -17,10 +17,11 @@ from typing import Any, Dict, List, Optional, Union import torch -from pytorch_lightning.utilities import move_data_to_device, rank_zero_warn from torch.optim.optimizer import Optimizer from torch.utils.data import DataLoader +from pytorch_lightning.utilities import move_data_to_device, rank_zero_warn + class ModelHooks: """Hooks to be used in LightningModule.""" @@ -539,9 +540,9 @@ def transfer_batch_to_device(self, batch, device) any other device than the one passed in as argument (unless you know what you are doing). Note: - This hook only runs on single GPU training (no data-parallel). If you need multi-GPU support - for your custom batch objects, you need to define your custom - :class:`~torch.nn.parallel.DistributedDataParallel` or + This hook only runs on single GPU training and DDP. + If you need multi-GPU support for your custom batch objects in `dp` or `ddp2`, + you need to define your custom :class:`~torch.nn.parallel.DistributedDataParallel` or :class:`~pytorch_lightning.overrides.data_parallel.LightningDistributedDataParallel` and override :meth:`~pytorch_lightning.core.lightning.LightningModule.configure_ddp`. From 678e0b986c2d0851cd7682f22ff07c9ce6e927eb Mon Sep 17 00:00:00 2001 From: chaton Date: Tue, 5 Jan 2021 07:55:42 +0100 Subject: [PATCH 27/38] Update pytorch_lightning/core/hooks.py Co-authored-by: Rohit Gupta --- pytorch_lightning/core/hooks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/core/hooks.py b/pytorch_lightning/core/hooks.py index 086ce244146e1..4a1eeb4e9f608 100644 --- a/pytorch_lightning/core/hooks.py +++ b/pytorch_lightning/core/hooks.py @@ -541,7 +541,7 @@ def transfer_batch_to_device(self, batch, device) Note: This hook only runs on single GPU training and DDP. - If you need multi-GPU support for your custom batch objects in `dp` or `ddp2`, + If you need multi-GPU support for your custom batch objects in ``dp`` or ``ddp2``, you need to define your custom :class:`~torch.nn.parallel.DistributedDataParallel` or :class:`~pytorch_lightning.overrides.data_parallel.LightningDistributedDataParallel` and override :meth:`~pytorch_lightning.core.lightning.LightningModule.configure_ddp`. From 5d0bcf151834c8e1c759a9c78f675892bdb43b1a Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 5 Jan 2021 07:57:21 +0100 Subject: [PATCH 28/38] update on comments --- pytorch_lightning/plugins/ddp_plugin.py | 12 ++++++------ tests/models/test_hooks.py | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 7643e6c76e236..989fbf82353c7 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -99,16 +99,16 @@ def is_running_single_process_per_device(self) -> bool: # used in `self.on_before_forward` return self.device_ids is not None and len(self.device_ids) == 1 - def on_before_forward(self, model: LightningModule, *batch): + def on_before_forward(self, model: LightningModule, *args): """ Override to handle custom input to device logic. For DDP, no logic is required as this is handled internally within the DDP wrapper. Example:: - def on_before_forward(self, model, *batch): - batch, batch_idx = batch - return batch.to(model.device) + def on_before_forward(self, model, *args): + batch, batch_idx = args + return batch.to(model.device), batch_idx Args: batch: Inputs to the model. @@ -116,8 +116,8 @@ def on_before_forward(self, model, *batch): Returns: batch moved to correct device if needed. """ if self.is_running_single_process_per_device: - batch = model.transfer_batch_to_device(batch, model.device) - return batch + args = model.transfer_batch_to_device(args, model.device) + return args def optimizer_state(self, optimizer: Optimizer) -> dict: return optimizer.state_dict() diff --git a/tests/models/test_hooks.py b/tests/models/test_hooks.py index c993b3b3e50b2..8a5d2f667bc32 100644 --- a/tests/models/test_hooks.py +++ b/tests/models/test_hooks.py @@ -158,8 +158,8 @@ def train_dataloader(self): model.training_epoch_end = None trainer = Trainer( default_root_dir=tmpdir, - limit_train_batches=1, - limit_val_batches=1, + limit_train_batches=2, + limit_val_batches=0, max_epochs=1, weights_summary=None, accelerator="ddp", From a6678612360b0ff0fb838bf280b54812b646e6e1 Mon Sep 17 00:00:00 2001 From: chaton Date: Tue, 5 Jan 2021 11:02:05 +0100 Subject: [PATCH 29/38] Update pytorch_lightning/plugins/ddp_plugin.py Co-authored-by: Rohit Gupta --- pytorch_lightning/plugins/ddp_plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 989fbf82353c7..eb433af89ecad 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -113,7 +113,7 @@ def on_before_forward(self, model, *args): Args: batch: Inputs to the model. model: Model to train. - Returns: batch moved to correct device if needed. + Returns: args moved to correct device if needed. """ if self.is_running_single_process_per_device: args = model.transfer_batch_to_device(args, model.device) From c75466fdc7eb3cc7ffab023886f3867f456fa750 Mon Sep 17 00:00:00 2001 From: chaton Date: Tue, 5 Jan 2021 11:02:18 +0100 Subject: [PATCH 30/38] Update pytorch_lightning/plugins/ddp_plugin.py Co-authored-by: Rohit Gupta --- pytorch_lightning/plugins/ddp_plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index eb433af89ecad..860c00bff9755 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -111,8 +111,8 @@ def on_before_forward(self, model, *args): return batch.to(model.device), batch_idx Args: - batch: Inputs to the model. model: Model to train. + args: Inputs to the model. Returns: args moved to correct device if needed. """ if self.is_running_single_process_per_device: From d780c1636974ec98022e3de2a2cd6c8292fc3d2c Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 5 Jan 2021 13:55:38 +0100 Subject: [PATCH 31/38] resolve pep8 --- tests/checkpointing/test_model_checkpoint.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/checkpointing/test_model_checkpoint.py b/tests/checkpointing/test_model_checkpoint.py index 7dbdee3d8a915..8d4a859a88784 100644 --- a/tests/checkpointing/test_model_checkpoint.py +++ b/tests/checkpointing/test_model_checkpoint.py @@ -11,20 +11,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from argparse import Namespace import os -from pathlib import Path import pickle import platform import re +from argparse import Namespace +from pathlib import Path from unittest import mock from unittest.mock import Mock import cloudpickle -from omegaconf import Container, OmegaConf import pytest import torch import yaml +from omegaconf import Container, OmegaConf import pytorch_lightning as pl import tests.base.develop_utils as tutils @@ -34,7 +34,6 @@ from pytorch_lightning.utilities.cloud_io import load as pl_load from pytorch_lightning.utilities.exceptions import MisconfigurationException from tests.base import BoringModel -import tests.base.develop_utils as tutils class LogInTwoMethods(BoringModel): From ff96053477b8587da9d563c0fe7399ef91ea3b00 Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 5 Jan 2021 13:57:25 +0100 Subject: [PATCH 32/38] add device_ids to pipe --- pytorch_lightning/plugins/ddp_sequential_plugin.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/plugins/ddp_sequential_plugin.py b/pytorch_lightning/plugins/ddp_sequential_plugin.py index cb8740742db73..4c000a0b07c62 100644 --- a/pytorch_lightning/plugins/ddp_sequential_plugin.py +++ b/pytorch_lightning/plugins/ddp_sequential_plugin.py @@ -19,8 +19,8 @@ from torch import nn from torch.nn.parallel import DistributedDataParallel -from pytorch_lightning import LightningModule from pytorch_lightning import _logger as log +from pytorch_lightning import LightningModule from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel from pytorch_lightning.plugins.rpc_plugin import RPCPlugin from pytorch_lightning.utilities import FAIRSCALE_PIPE_AVAILABLE, rank_zero_only @@ -270,6 +270,7 @@ def configure_ddp( ddp_plugin = RPCPlugin(process_group=mpu.get_data_parallel_group()).configure_ddp(model, device_ids) # Plugin handle backwards across processes. Currently not supported for DDP + pipe parallel ddp_plugin.PREPARE_FOR_BACKWARDS = False + self.device_ids = device_ids return ddp_plugin @rank_zero_only From b16b6118b2d58fabe4d1483e887183f56c1e0e29 Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 6 Jan 2021 08:59:03 +0100 Subject: [PATCH 33/38] update on comments --- .../accelerators/ddp2_accelerator.py | 3 +++ pytorch_lightning/accelerators/ddp_accelerator.py | 3 +++ .../accelerators/ddp_cpu_spawn_accelerator.py | 3 +++ .../accelerators/ddp_hpc_accelerator.py | 3 +++ .../accelerators/ddp_spawn_accelerator.py | 3 +++ pytorch_lightning/plugins/ddp_plugin.py | 15 +-------------- .../plugins/ddp_sequential_plugin.py | 1 - pytorch_lightning/plugins/sharded_plugin.py | 5 ++--- 8 files changed, 18 insertions(+), 18 deletions(-) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index fe4133cb52624..be5de9eb58b9a 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -200,6 +200,9 @@ def ddp_train(self, process_idx, mp_queue, model): # allow user to configure ddp model = self.configure_ddp(model, device_ids) + # attach the device_ids to the plugin + model.device_ids = device_ids + # set up training routine self.trainer.train_loop.setup_training(model) diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index a0e7f7b113b8b..4b1d035b3a16a 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -299,6 +299,9 @@ def ddp_train(self, process_idx, model): # allow user to configure ddp model = self.configure_ddp(model, device_ids) + # attach the device_ids to the plugin + model.device_ids = device_ids + # set up training routine self.barrier('ddp_setup') self.trainer.train_loop.setup_training(model) diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index de7b93511507c..bca1d898948c4 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -160,6 +160,9 @@ def ddp_train(self, process_idx, mp_queue, model): # allow user to configure ddp model = self.configure_ddp(model, device_ids) + # attach the device_ids to the plugin + model.device_ids = device_ids + # set up training routine self.trainer.train_loop.setup_training(model) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index d03e899cba5d6..c1545904903f8 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -191,6 +191,9 @@ def ddp_train(self, process_idx, model): # allow user to configure ddp model = self.configure_ddp(model, device_ids) + # attach the device_ids to the plugin + model.device_ids = device_ids + # set up training routine self.trainer.train_loop.setup_training(model) diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index 3ba5268eaf056..fd964f65f2808 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -175,6 +175,9 @@ def ddp_train(self, process_idx, mp_queue, model, is_master=False, proc_offset=0 # allow user to configure ddp model = self.configure_ddp(model, device_ids) + # attach the device_ids to the plugin + model.device_ids = device_ids + # set up training routine self.trainer.train_loop.setup_training(model) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 860c00bff9755..69f8810880ee7 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -69,7 +69,6 @@ def configure_ddp(self, model, device_ids): device_ids=device_ids, **self._ddp_kwargs, ) - self.device_ids = device_ids return model def init_ddp_connection( @@ -101,19 +100,7 @@ def is_running_single_process_per_device(self) -> bool: def on_before_forward(self, model: LightningModule, *args): """ - Override to handle custom input to device logic. For DDP, no logic is required as this is handled internally - within the DDP wrapper. - - Example:: - - def on_before_forward(self, model, *args): - batch, batch_idx = args - return batch.to(model.device), batch_idx - - Args: - model: Model to train. - args: Inputs to the model. - Returns: args moved to correct device if needed. + Override to handle custom edge case. """ if self.is_running_single_process_per_device: args = model.transfer_batch_to_device(args, model.device) diff --git a/pytorch_lightning/plugins/ddp_sequential_plugin.py b/pytorch_lightning/plugins/ddp_sequential_plugin.py index 4c000a0b07c62..4d2835c518b2d 100644 --- a/pytorch_lightning/plugins/ddp_sequential_plugin.py +++ b/pytorch_lightning/plugins/ddp_sequential_plugin.py @@ -270,7 +270,6 @@ def configure_ddp( ddp_plugin = RPCPlugin(process_group=mpu.get_data_parallel_group()).configure_ddp(model, device_ids) # Plugin handle backwards across processes. Currently not supported for DDP + pipe parallel ddp_plugin.PREPARE_FOR_BACKWARDS = False - self.device_ids = device_ids return ddp_plugin @rank_zero_only diff --git a/pytorch_lightning/plugins/sharded_plugin.py b/pytorch_lightning/plugins/sharded_plugin.py index c9951c49f53fb..f3d6149443d22 100644 --- a/pytorch_lightning/plugins/sharded_plugin.py +++ b/pytorch_lightning/plugins/sharded_plugin.py @@ -11,13 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Optional, Union, Any +from typing import Any, List, Optional, Union from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.core.optimizer import is_lightning_optimizer from pytorch_lightning.plugins.ddp_plugin import DDPPlugin from pytorch_lightning.plugins.sharded_native_amp_plugin import ShardedNativeAMPPlugin -from pytorch_lightning.utilities import FAIRSCALE_AVAILABLE, AMPType, rank_zero_only +from pytorch_lightning.utilities import AMPType, FAIRSCALE_AVAILABLE, rank_zero_only from pytorch_lightning.utilities.exceptions import MisconfigurationException if FAIRSCALE_AVAILABLE: @@ -35,7 +35,6 @@ def __init__(self, **kwargs): def configure_ddp( self, model: LightningModule, device_ids: List[int] ): - self.device_ids = device_ids self._wrap_optimizers(model) model = LightningShardedDataParallel(model, sharded_optimizer=model.trainer.optimizers) return model From 6ed7361552cf924d1f79c5b0a79a39da3585cdf7 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 6 Jan 2021 08:10:47 +0000 Subject: [PATCH 34/38] update --- pytorch_lightning/accelerators/ddp2_accelerator.py | 1 + pytorch_lightning/accelerators/ddp_accelerator.py | 4 +--- pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py | 1 + pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 1 + pytorch_lightning/accelerators/ddp_spawn_accelerator.py | 1 + 5 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index be5de9eb58b9a..c66acdb812608 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -216,6 +216,7 @@ def ddp_train(self, process_idx, mp_queue, model): def configure_ddp( self, model: LightningModule, device_ids: List[int] ) -> DistributedDataParallel: + self.ddp_plugin.device_ids = device_ids model = self.ddp_plugin.configure_ddp(model, device_ids) return model diff --git a/pytorch_lightning/accelerators/ddp_accelerator.py b/pytorch_lightning/accelerators/ddp_accelerator.py index 4b1d035b3a16a..1f1f1f42f52ff 100644 --- a/pytorch_lightning/accelerators/ddp_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_accelerator.py @@ -299,9 +299,6 @@ def ddp_train(self, process_idx, model): # allow user to configure ddp model = self.configure_ddp(model, device_ids) - # attach the device_ids to the plugin - model.device_ids = device_ids - # set up training routine self.barrier('ddp_setup') self.trainer.train_loop.setup_training(model) @@ -317,6 +314,7 @@ def ddp_train(self, process_idx, model): def configure_ddp( self, model: LightningModule, device_ids: List[int] ) -> DistributedDataParallel: + self.ddp_plugin.device_ids = device_ids model = self.ddp_plugin.configure_ddp(model, device_ids) return model diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index bca1d898948c4..e71d944a56369 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -244,6 +244,7 @@ def transfer_distrib_spawn_state_on_fit_end(self, model, mp_queue, results): def configure_ddp( self, model: LightningModule, device_ids: List[int] ) -> DistributedDataParallel: + self.ddp_plugin.device_ids = device_ids model = self.ddp_plugin.configure_ddp(model, device_ids) return model diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index c1545904903f8..54b1e606e7cd1 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -208,6 +208,7 @@ def ddp_train(self, process_idx, model): def configure_ddp( self, model: LightningModule, device_ids: List[int] ) -> DistributedDataParallel: + self.ddp_plugin.device_ids = device_ids model = self.ddp_plugin.configure_ddp(model, device_ids) return model diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index fd964f65f2808..2a7a5a45624bd 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -276,6 +276,7 @@ def transfer_distrib_spawn_state_on_fit_end(self, model, mp_queue, results): def configure_ddp( self, model: LightningModule, device_ids: List[int] ) -> DistributedDataParallel: + self.ddp_plugin.device_ids = device_ids model = self.ddp_plugin.configure_ddp(model, device_ids) return model From e71d8cd2e65ebbd63f33bd02cd3228490fa0d8a0 Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 6 Jan 2021 09:14:20 +0100 Subject: [PATCH 35/38] resolve --- pytorch_lightning/accelerators/ddp2_accelerator.py | 3 --- pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py | 3 --- pytorch_lightning/accelerators/ddp_hpc_accelerator.py | 3 --- pytorch_lightning/accelerators/ddp_spawn_accelerator.py | 3 --- pytorch_lightning/plugins/ddp_plugin.py | 2 +- pytorch_lightning/plugins/sharded_plugin.py | 3 +-- 6 files changed, 2 insertions(+), 15 deletions(-) diff --git a/pytorch_lightning/accelerators/ddp2_accelerator.py b/pytorch_lightning/accelerators/ddp2_accelerator.py index c66acdb812608..46d944a35cb62 100644 --- a/pytorch_lightning/accelerators/ddp2_accelerator.py +++ b/pytorch_lightning/accelerators/ddp2_accelerator.py @@ -200,9 +200,6 @@ def ddp_train(self, process_idx, mp_queue, model): # allow user to configure ddp model = self.configure_ddp(model, device_ids) - # attach the device_ids to the plugin - model.device_ids = device_ids - # set up training routine self.trainer.train_loop.setup_training(model) diff --git a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py index e71d944a56369..cc178dc14b49d 100644 --- a/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_cpu_spawn_accelerator.py @@ -160,9 +160,6 @@ def ddp_train(self, process_idx, mp_queue, model): # allow user to configure ddp model = self.configure_ddp(model, device_ids) - # attach the device_ids to the plugin - model.device_ids = device_ids - # set up training routine self.trainer.train_loop.setup_training(model) diff --git a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py index 54b1e606e7cd1..c2915b9d570bb 100644 --- a/pytorch_lightning/accelerators/ddp_hpc_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_hpc_accelerator.py @@ -191,9 +191,6 @@ def ddp_train(self, process_idx, model): # allow user to configure ddp model = self.configure_ddp(model, device_ids) - # attach the device_ids to the plugin - model.device_ids = device_ids - # set up training routine self.trainer.train_loop.setup_training(model) diff --git a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py index 2a7a5a45624bd..f35b42342d88a 100644 --- a/pytorch_lightning/accelerators/ddp_spawn_accelerator.py +++ b/pytorch_lightning/accelerators/ddp_spawn_accelerator.py @@ -175,9 +175,6 @@ def ddp_train(self, process_idx, mp_queue, model, is_master=False, proc_offset=0 # allow user to configure ddp model = self.configure_ddp(model, device_ids) - # attach the device_ids to the plugin - model.device_ids = device_ids - # set up training routine self.trainer.train_loop.setup_training(model) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index 69f8810880ee7..b2e972b8e1b88 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -95,7 +95,7 @@ def init_ddp_connection( @property def is_running_single_process_per_device(self) -> bool: # objects do not need to be scattered in single process per device, move objects upfront to device - # used in `self.on_before_forward` + # This property is used in ``self.on_before_forward`` function. return self.device_ids is not None and len(self.device_ids) == 1 def on_before_forward(self, model: LightningModule, *args): diff --git a/pytorch_lightning/plugins/sharded_plugin.py b/pytorch_lightning/plugins/sharded_plugin.py index f3d6149443d22..b87a2c2a389ef 100644 --- a/pytorch_lightning/plugins/sharded_plugin.py +++ b/pytorch_lightning/plugins/sharded_plugin.py @@ -36,8 +36,7 @@ def configure_ddp( self, model: LightningModule, device_ids: List[int] ): self._wrap_optimizers(model) - model = LightningShardedDataParallel(model, sharded_optimizer=model.trainer.optimizers) - return model + return LightningShardedDataParallel(model, sharded_optimizer=model.trainer.optimizers) def optimizer_state(self, optimizer: 'OSS') -> Optional[dict]: optimizer.consolidate_state_dict() From 525d26d029eaab897aa97860252f09920a81c6ef Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 6 Jan 2021 09:17:03 +0100 Subject: [PATCH 36/38] update --- pytorch_lightning/plugins/ddp_plugin.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index b2e972b8e1b88..f87a7ae6f0e69 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -101,6 +101,11 @@ def is_running_single_process_per_device(self) -> bool: def on_before_forward(self, model: LightningModule, *args): """ Override to handle custom edge case. + + Args: + args: Inputs to the model. + model: Model to train. + Returns: args moved to correct device if needed. """ if self.is_running_single_process_per_device: args = model.transfer_batch_to_device(args, model.device) @@ -114,6 +119,7 @@ def on_after_setup_optimizers(self, trainer): Called after optimizers have been set-up. This is useful for doing any configuration options in RPC, or state sharding. """ + pass def get_model_from_plugin( self, From 34def10e30b31ad2e208e7fb6a87fc53318a1de6 Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 6 Jan 2021 11:26:12 +0100 Subject: [PATCH 37/38] update --- pytorch_lightning/plugins/ddp_plugin.py | 1 - pytorch_lightning/utilities/apply_func.py | 8 +++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py index f87a7ae6f0e69..6d5ad1e9e2119 100644 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -119,7 +119,6 @@ def on_after_setup_optimizers(self, trainer): Called after optimizers have been set-up. This is useful for doing any configuration options in RPC, or state sharding. """ - pass def get_model_from_plugin( self, diff --git a/pytorch_lightning/utilities/apply_func.py b/pytorch_lightning/utilities/apply_func.py index 775c22dbbfa0a..76ac0a6c595aa 100644 --- a/pytorch_lightning/utilities/apply_func.py +++ b/pytorch_lightning/utilities/apply_func.py @@ -49,12 +49,14 @@ def apply_to_collection(data: Any, dtype: Union[type, tuple], function: Callable return function(data, *args, **kwargs) # Recursively apply to collection items - elif isinstance(data, Mapping): + if isinstance(data, Mapping): return elem_type({k: apply_to_collection(v, dtype, function, *args, **kwargs) for k, v in data.items()}) - elif isinstance(data, tuple) and hasattr(data, '_fields'): # named tuple + + if isinstance(data, tuple) and hasattr(data, '_fields'): # named tuple return elem_type(*(apply_to_collection(d, dtype, function, *args, **kwargs) for d in data)) - elif isinstance(data, Sequence) and not isinstance(data, str): + + if isinstance(data, Sequence) and not isinstance(data, str): return elem_type([apply_to_collection(d, dtype, function, *args, **kwargs) for d in data]) # data is neither of dtype, nor a collection From f6794402b59c74cec75d8e7b1c251b52b991e4b6 Mon Sep 17 00:00:00 2001 From: tchaton Date: Wed, 6 Jan 2021 15:49:08 +0100 Subject: [PATCH 38/38] update --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ca2165c8f77e..be7585ab1fc24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ### Fixed +- Fixed `transfer_batch_to_device` for DDP with `len(devices_ids) == 1` ([#5195](https://github.com/PyTorchLightning/pytorch-lightning/pull/5195)) + + ## [1.1.3] - 2021-01-05