From 86a73091bbb432adf0029f1ec3aa049a65274e75 Mon Sep 17 00:00:00 2001 From: thomas chaton Date: Mon, 19 Apr 2021 15:52:48 +0000 Subject: [PATCH 1/5] add test --- pytorch_lightning/trainer/data_loading.py | 6 +++- tests/trainer/test_supporters.py | 39 ++++++++++++++++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/pytorch_lightning/trainer/data_loading.py b/pytorch_lightning/trainer/data_loading.py index a361f6e6203c2..1f382c85a7597 100644 --- a/pytorch_lightning/trainer/data_loading.py +++ b/pytorch_lightning/trainer/data_loading.py @@ -108,12 +108,15 @@ def auto_add_worker_init_fn(self, dataloader: DataLoader) -> None: dataloader.worker_init_fn = partial(pl_worker_init_function, rank=self.global_rank) def auto_add_sampler(self, dataloader: DataLoader, shuffle: bool) -> DataLoader: - # don't do anything if it's not a dataloader is_dataloader = isinstance(dataloader, DataLoader) # don't manipulate iterable datasets is_iterable_ds = has_iterable_dataset(dataloader) + if isinstance(dataloader, CombinedLoader): + dataloader.loaders = apply_to_collection(dataloader.loaders, DataLoader, self.auto_add_sampler, shuffle) + return dataloader + if not is_dataloader or is_iterable_ds: return dataloader @@ -339,6 +342,7 @@ def _reset_eval_dataloader( rank_zero_warn("One of given dataloaders is None and it will be skipped.") # add samplers + print("HERE", dataloaders[0], dataloaders[0].sampler) dataloaders = [self.auto_add_sampler(dl, shuffle=False) for dl in dataloaders if dl is not None] # add worker_init_fn for correct seeding in worker processes diff --git a/tests/trainer/test_supporters.py b/tests/trainer/test_supporters.py index 30b984dc896be..ce120ef9e595c 100644 --- a/tests/trainer/test_supporters.py +++ b/tests/trainer/test_supporters.py @@ -15,7 +15,11 @@ import pytest import torch -from torch.utils.data import TensorDataset +from torch.utils.data import TensorDataset, DataLoader +from torch.utils.data.dataset import Dataset +from pytorch_lightning import LightningDataModule, Trainer +from tests.helpers import BoringModel +from tests.helpers.runif import RunIf from pytorch_lightning.trainer.supporters import ( _nested_calc_num_data, @@ -237,3 +241,36 @@ def test_nested_calc_num_data(input_data, compute_func, expected_length): calculated_length = _nested_calc_num_data(input_data, compute_func) assert calculated_length == expected_length + + +@RunIf(min_gpus=2, special=True) +def test_combined_data_loader_validation_test(tmpdir): + + class CustomDataset(Dataset): + + def __init__(self, data): + self.data = data + + def __len__(self): + return len(self.data) + + def __getitem__(self, index): + return self.data[index] + + class CustomDataModule(LightningDataModule): + + def val_dataloader(self) -> CombinedLoader: + return CombinedLoader({"a": DataLoader(CustomDataset(range(10)))}) + + class CustomModel(BoringModel): + + def validation_step(self, batch, batch_idx): + v = batch['a'] + assert (v + int(self.trainer.global_rank == 1)) % 2 == 0 + + + model = CustomModel() + model.validation_epoch_end = None + dm = CustomDataModule() + trainer = Trainer(max_epochs=1, accelerator="ddp", gpus=2, logger=False) + trainer.fit(model, datamodule=dm) \ No newline at end of file From 0ad1ba24e494492619d199007a4ab95860df58a6 Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 19 Apr 2021 16:55:33 +0100 Subject: [PATCH 2/5] add changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bab60910c7a30..8b5f689242d84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -301,6 +301,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Fixed metric objects passed directly to `self.log` not being reset correctly ([#7055](https://github.com/PyTorchLightning/pytorch-lightning/pull/7055)) +- Fixed `CombinedLoader` in distributed settings for validation / testing ([#7102](https://github.com/PyTorchLightning/pytorch-lightning/pull/7102)) + + ## [1.2.7] - 2021-04-06 ### Fixed From 116b0087ed1fa6ce4636f0d9c5d23136b4cc4cc6 Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 19 Apr 2021 17:41:56 +0100 Subject: [PATCH 3/5] resolve flake8 --- tests/trainer/test_supporters.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/trainer/test_supporters.py b/tests/trainer/test_supporters.py index ce120ef9e595c..5e2ba7347c4a4 100644 --- a/tests/trainer/test_supporters.py +++ b/tests/trainer/test_supporters.py @@ -15,12 +15,10 @@ import pytest import torch -from torch.utils.data import TensorDataset, DataLoader +from torch.utils.data import DataLoader, TensorDataset from torch.utils.data.dataset import Dataset -from pytorch_lightning import LightningDataModule, Trainer -from tests.helpers import BoringModel -from tests.helpers.runif import RunIf +from pytorch_lightning import LightningDataModule, Trainer from pytorch_lightning.trainer.supporters import ( _nested_calc_num_data, CombinedDataset, @@ -30,6 +28,8 @@ TensorRunningAccum, ) from pytorch_lightning.utilities.exceptions import MisconfigurationException +from tests.helpers import BoringModel +from tests.helpers.runif import RunIf def test_tensor_running_accum_reset(): @@ -245,6 +245,10 @@ def test_nested_calc_num_data(input_data, compute_func, expected_length): @RunIf(min_gpus=2, special=True) def test_combined_data_loader_validation_test(tmpdir): + """ + This test makes sure distributed sampler has been properly injected in dataloaders + when using CombinedLoader + """ class CustomDataset(Dataset): @@ -267,10 +271,9 @@ class CustomModel(BoringModel): def validation_step(self, batch, batch_idx): v = batch['a'] assert (v + int(self.trainer.global_rank == 1)) % 2 == 0 - model = CustomModel() model.validation_epoch_end = None dm = CustomDataModule() trainer = Trainer(max_epochs=1, accelerator="ddp", gpus=2, logger=False) - trainer.fit(model, datamodule=dm) \ No newline at end of file + trainer.fit(model, datamodule=dm) From 28115f3c4159d33525467e4b39b93b0dd68ad9ae Mon Sep 17 00:00:00 2001 From: tchaton Date: Mon, 19 Apr 2021 17:51:43 +0100 Subject: [PATCH 4/5] remove print --- pytorch_lightning/trainer/data_loading.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pytorch_lightning/trainer/data_loading.py b/pytorch_lightning/trainer/data_loading.py index 1f382c85a7597..51ed858fa9b22 100644 --- a/pytorch_lightning/trainer/data_loading.py +++ b/pytorch_lightning/trainer/data_loading.py @@ -342,7 +342,6 @@ def _reset_eval_dataloader( rank_zero_warn("One of given dataloaders is None and it will be skipped.") # add samplers - print("HERE", dataloaders[0], dataloaders[0].sampler) dataloaders = [self.auto_add_sampler(dl, shuffle=False) for dl in dataloaders if dl is not None] # add worker_init_fn for correct seeding in worker processes From a83fee266f4a8996ebbb276580f301b9a36806b1 Mon Sep 17 00:00:00 2001 From: tchaton Date: Tue, 20 Apr 2021 08:52:22 +0100 Subject: [PATCH 5/5] update --- pytorch_lightning/trainer/supporters.py | 4 +- tests/trainer/test_supporters.py | 52 +++++++++++++++---------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/pytorch_lightning/trainer/supporters.py b/pytorch_lightning/trainer/supporters.py index f884306dc09c8..3cb0b0cb1f11a 100644 --- a/pytorch_lightning/trainer/supporters.py +++ b/pytorch_lightning/trainer/supporters.py @@ -19,6 +19,8 @@ import torch from torch import Tensor from torch.utils.data import Dataset +from torch.utils.data.dataloader import DataLoader +from torch.utils.data.dataset import IterableDataset from pytorch_lightning.utilities.apply_func import apply_to_collection from pytorch_lightning.utilities.cloud_io import get_filesystem @@ -352,7 +354,7 @@ def __init__(self, loaders: Any, mode: str = 'min_size'): @property def sampler(self) -> Union[Iterable, Sequence, Mapping]: """Return a collections of samplers extracting from loaders.""" - return apply_to_collection(self.loaders, Iterable, getattr, 'sampler', None, wrong_dtype=(Sequence, Mapping)) + return apply_to_collection(self.loaders, (DataLoader, IterableDataset), getattr, 'sampler', None) def _wrap_loaders_max_size_cycle(self) -> Any: """ diff --git a/tests/trainer/test_supporters.py b/tests/trainer/test_supporters.py index 5e2ba7347c4a4..6da2436b5eafc 100644 --- a/tests/trainer/test_supporters.py +++ b/tests/trainer/test_supporters.py @@ -11,14 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os from collections import Sequence +from unittest import mock import pytest import torch from torch.utils.data import DataLoader, TensorDataset from torch.utils.data.dataset import Dataset +from torch.utils.data.distributed import DistributedSampler +from torch.utils.data.sampler import Sampler -from pytorch_lightning import LightningDataModule, Trainer +from pytorch_lightning import Trainer from pytorch_lightning.trainer.supporters import ( _nested_calc_num_data, CombinedDataset, @@ -27,9 +31,8 @@ CycleIterator, TensorRunningAccum, ) +from pytorch_lightning.utilities.apply_func import apply_to_collection from pytorch_lightning.utilities.exceptions import MisconfigurationException -from tests.helpers import BoringModel -from tests.helpers.runif import RunIf def test_tensor_running_accum_reset(): @@ -243,8 +246,10 @@ def test_nested_calc_num_data(input_data, compute_func, expected_length): assert calculated_length == expected_length -@RunIf(min_gpus=2, special=True) -def test_combined_data_loader_validation_test(tmpdir): +@mock.patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,1", "PL_TRAINER_GPUS": "2"}) +@mock.patch('torch.cuda.device_count', return_value=2) +@mock.patch('torch.cuda.is_available', return_value=True) +def test_combined_data_loader_validation_test(cuda_available_mock, device_count_mock, tmpdir): """ This test makes sure distributed sampler has been properly injected in dataloaders when using CombinedLoader @@ -261,19 +266,24 @@ def __len__(self): def __getitem__(self, index): return self.data[index] - class CustomDataModule(LightningDataModule): - - def val_dataloader(self) -> CombinedLoader: - return CombinedLoader({"a": DataLoader(CustomDataset(range(10)))}) - - class CustomModel(BoringModel): - - def validation_step(self, batch, batch_idx): - v = batch['a'] - assert (v + int(self.trainer.global_rank == 1)) % 2 == 0 - - model = CustomModel() - model.validation_epoch_end = None - dm = CustomDataModule() - trainer = Trainer(max_epochs=1, accelerator="ddp", gpus=2, logger=False) - trainer.fit(model, datamodule=dm) + dataloader = CombinedLoader({ + "a": DataLoader(CustomDataset(range(10))), + "b": { + "c": DataLoader(CustomDataset(range(10))), + "d": DataLoader(CustomDataset(range(10))) + }, + "e": [DataLoader(CustomDataset(range(10))), + DataLoader(CustomDataset(range(10)))] + }) + + trainer = Trainer(replace_sampler_ddp=True, accelerator="ddp", gpus=2) + dataloader = trainer.auto_add_sampler(dataloader, shuffle=True) + _count = 0 + + def _assert_distributed_sampler(v): + nonlocal _count + _count += 1 + assert isinstance(v, DistributedSampler) + + apply_to_collection(dataloader.sampler, Sampler, _assert_distributed_sampler) + assert _count == 5