Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
### Fixed


- Fixed DeepSpeed with IterableDatasets ([#7362](https://github.com/PyTorchLightning/pytorch-lightning/pull/7362))


## [1.3.0] - 2021-05-06

Expand Down
27 changes: 24 additions & 3 deletions pytorch_lightning/plugins/training_type/deepspeed.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(
allgather_bucket_size: int = 2e8,
reduce_bucket_size: int = 2e8,
zero_allow_untested_optimizer: bool = True,
logging_batch_size_per_gpu: Union[str, int] = "auto",
config: Optional[Union[Path, str, dict]] = None,
logging_level: int = logging.WARN,
num_nodes: int = 1,
Expand Down Expand Up @@ -148,6 +149,13 @@ def __init__(
zero_allow_untested_optimizer: Allow untested optimizers to be used with ZeRO. Currently only Adam is a
DeepSpeed supported optimizer when using ZeRO (default: True)

logging_batch_size_per_gpu: Config used in DeepSpeed to calculate verbose timing for logging
on a per sample per second basis (only displayed if logging=logging.INFO).
If set to "auto", the plugin tries to infer this from
the train DataLoader's BatchSampler, else defaults to 1.
To obtain accurate logs when using datasets that do not support batch samplers,
set this to the actual per gpu batch size (trainer.batch_size).

config: Pass in a deepspeed formatted config dict,
or path to a deepspeed config: https://www.deepspeed.ai/docs/config-json.
All defaults will be ignored if a config is passed in. (Default: ``None``)
Expand Down Expand Up @@ -182,6 +190,7 @@ def __init__(
when using ZeRO Stage 3. This allows a single weight file to contain the entire model,
rather than individual sharded weight files.
Disable to save sharded states individually. (Default: True)

"""
if not _DEEPSPEED_AVAILABLE:
raise MisconfigurationException(
Expand All @@ -197,6 +206,7 @@ def __init__(
self.config = self._create_default_config(
zero_optimization,
zero_allow_untested_optimizer,
logging_batch_size_per_gpu,
partition_activations=partition_activations,
cpu_checkpointing=cpu_checkpointing,
contiguous_memory_optimization=contiguous_memory_optimization,
Expand Down Expand Up @@ -409,14 +419,22 @@ def _format_batch_size_and_grad_accum_config(self):
" as this will be set via accumulate_grad_batches=x argument passed via the Lightning Trainer."
)
if "train_micro_batch_size_per_gpu" not in self.config:
# train_micro_batch_size_per_gpu is used for throughput logging purposes
# by default we use the batch size of the loader which may be incorrect if a batch sampler is passed
batch_size = self.lightning_module.train_dataloader().batch_sampler.batch_size
batch_size = self._auto_select_batch_size()
self.config["train_micro_batch_size_per_gpu"] = batch_size
self.config["gradient_accumulation_steps"] = self.lightning_module.trainer.accumulate_grad_batches
if "gradient_clipping" not in self.config:
self.config["gradient_clipping"] = self.lightning_module.trainer.gradient_clip_val

def _auto_select_batch_size(self):
# train_micro_batch_size_per_gpu is used for throughput logging purposes
# by default we try to use the batch size of the loader
batch_size = 1
if hasattr(self.lightning_module, 'train_dataloader'):
train_dataloader = self.lightning_module.train_dataloader()
if hasattr(train_dataloader, 'batch_sampler'):
batch_size = train_dataloader.batch_sampler.batch_size
Comment on lines +432 to +435
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anyone can suggest anything cleaner please do :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not much as train_dataloader() is callable not just an attribute...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could break if the user provides several dataloaders to the CombinedLoader.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leezu ^^

return batch_size

def _format_precision_config(self):
amp_type = self.lightning_module.trainer.accelerator_connector.amp_type
amp_level = self.lightning_module.trainer.accelerator_connector.amp_level
Expand Down Expand Up @@ -446,6 +464,7 @@ def _create_default_config(
self,
zero_optimization: bool,
zero_allow_untested_optimizer: bool,
logging_batch_size_per_gpu: Union[str, int],
partition_activations: bool,
cpu_checkpointing: bool,
contiguous_memory_optimization: bool,
Expand All @@ -466,6 +485,8 @@ def _create_default_config(
"zero_optimization": zero_kwargs,
**cfg
}
if logging_batch_size_per_gpu != 'auto':
cfg = {"train_micro_batch_size_per_gpu": logging_batch_size_per_gpu, **cfg}
return cfg

def _filepath_to_dir(self, filepath: str) -> str:
Expand Down
41 changes: 40 additions & 1 deletion tests/plugins/test_deepspeed_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
import torch.nn.functional as F
from torch import nn, Tensor
from torch.optim import Optimizer
from torch.utils.data import DataLoader

from pytorch_lightning import LightningModule, seed_everything, Trainer
from pytorch_lightning.callbacks import Callback, ModelCheckpoint
from pytorch_lightning.metrics import Accuracy
from pytorch_lightning.plugins import DeepSpeedPlugin, DeepSpeedPrecisionPlugin
from pytorch_lightning.plugins.training_type.deepspeed import LightningDeepSpeedModule
from pytorch_lightning.utilities.exceptions import MisconfigurationException
from tests.helpers.boring_model import BoringModel
from tests.helpers.boring_model import BoringModel, RandomDataset, RandomIterableDataset
from tests.helpers.datamodules import ClassifDataModule
from tests.helpers.runif import RunIf

Expand Down Expand Up @@ -234,6 +235,44 @@ def backward(self, loss: Tensor, optimizer: Optimizer, optimizer_idx: int, *args
trainer.fit(model)


@RunIf(min_gpus=1, deepspeed=True, special=True)
@pytest.mark.parametrize(['dataset_cls', 'value'], [(RandomDataset, "auto"), (RandomDataset, 10),
(RandomIterableDataset, "auto"), (RandomIterableDataset, 10)])
def test_deepspeed_auto_batch_size_config_select(tmpdir, dataset_cls, value):
"""Test to ensure that the batch size is correctly set as expected for deepspeed logging purposes."""

class TestModel(BoringModel):

def train_dataloader(self):
return DataLoader(dataset_cls(32, 64))

class AssertCallback(Callback):

def on_train_start(self, trainer, pl_module) -> None:
assert isinstance(trainer.accelerator.training_type_plugin, DeepSpeedPlugin)
config = trainer.accelerator.training_type_plugin.config

# int value overrides auto mode
expected_value = value if isinstance(value, int) else 1
if dataset_cls == RandomDataset:
expected_value = pl_module.train_dataloader().batch_size if value == "auto" else value

assert config['train_micro_batch_size_per_gpu'] == expected_value
raise SystemExit

ck = AssertCallback()
model = TestModel()
trainer = Trainer(
default_root_dir=tmpdir,
fast_dev_run=True,
callbacks=ck,
gpus=1,
plugins=DeepSpeedPlugin(logging_batch_size_per_gpu=value, zero_optimization=False),
)
with pytest.raises(SystemExit):
trainer.fit(model)


@RunIf(min_gpus=1, deepspeed=True, special=True)
def test_deepspeed_run_configure_optimizers(tmpdir):
"""
Expand Down