From 22fe14a5ea9db8f1b00876c4ab335a81ee48a075 Mon Sep 17 00:00:00 2001 From: Cyprien-Ricque Date: Thu, 7 Jul 2022 22:03:34 +0200 Subject: [PATCH 1/5] fix mypy typing in horovod.py --- pyproject.toml | 1 - src/pytorch_lightning/strategies/hivemind.py | 1 + src/pytorch_lightning/strategies/horovod.py | 23 +++++++++++++------- src/pytorch_lightning/utilities/types.py | 1 + 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c08d2c99bf3f5..1afbc7825370f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,7 +67,6 @@ module = [ "pytorch_lightning.strategies.deepspeed", "pytorch_lightning.strategies.dp", "pytorch_lightning.strategies.fully_sharded", - "pytorch_lightning.strategies.horovod", "pytorch_lightning.strategies.ipu", "pytorch_lightning.strategies.parallel", "pytorch_lightning.strategies.sharded", diff --git a/src/pytorch_lightning/strategies/hivemind.py b/src/pytorch_lightning/strategies/hivemind.py index 34e2f40b2ec40..d26fe33c85697 100644 --- a/src/pytorch_lightning/strategies/hivemind.py +++ b/src/pytorch_lightning/strategies/hivemind.py @@ -315,6 +315,7 @@ def __init__(self, optimizer: "hivemind.Optimizer", scheduler: _LRScheduler) -> # implemented custom logic which we would not want to call on destruction of the `HiveMindScheduler` self.__dict__ = {k: v for k, v in scheduler.__dict__.items() if k not in ("step", "__del__")} + self.base_lrs = scheduler.base_lrs self.optimizer = optimizer self.scheduler = scheduler self.current_step = -1 diff --git a/src/pytorch_lightning/strategies/horovod.py b/src/pytorch_lightning/strategies/horovod.py index 40fdd6f1120cd..9df1667e6cb30 100644 --- a/src/pytorch_lightning/strategies/horovod.py +++ b/src/pytorch_lightning/strategies/horovod.py @@ -24,12 +24,14 @@ from pytorch_lightning.plugins.io.checkpoint_plugin import CheckpointIO from pytorch_lightning.plugins.precision import PrecisionPlugin from pytorch_lightning.strategies.parallel import ParallelStrategy +from pytorch_lightning.strategies.strategy import TBroadcast from pytorch_lightning.utilities.distributed import distributed_available from pytorch_lightning.utilities.distributed import group as dist_group from pytorch_lightning.utilities.distributed import ReduceOp from pytorch_lightning.utilities.exceptions import MisconfigurationException from pytorch_lightning.utilities.imports import _HOROVOD_AVAILABLE from pytorch_lightning.utilities.rank_zero import rank_zero_only +from pytorch_lightning.utilities.types import _LRScheduler if _HOROVOD_AVAILABLE: import horovod.torch as hvd @@ -70,11 +72,11 @@ def world_size(self) -> int: return hvd.size() @property - def root_device(self): + def root_device(self) -> torch.device: return self.parallel_devices[self.local_rank] @property - def distributed_sampler_kwargs(self): + def distributed_sampler_kwargs(self) -> Dict[str, Any]: distributed_sampler_kwargs = dict(num_replicas=self.world_size, rank=self.global_rank) return distributed_sampler_kwargs @@ -95,7 +97,7 @@ def setup(self, trainer: "pl.Trainer") -> None: # no need to setup optimizers return - def _unpack_lightning_optimizer(opt): + def _unpack_lightning_optimizer(opt: Optimizer) -> Optimizer: return opt._optimizer if isinstance(opt, LightningOptimizer) else opt optimizers = self.optimizers @@ -111,8 +113,10 @@ def _unpack_lightning_optimizer(opt): lr_scheduler_configs = self.lr_scheduler_configs for config in lr_scheduler_configs: scheduler = config.scheduler + assert isinstance(scheduler, _LRScheduler), "ReduceLROnPlateau does not have attribute 'base_lr'" scheduler.base_lrs = [lr * self.world_size for lr in scheduler.base_lrs] + assert self.lightning_module is not None # Horovod: broadcast parameters & optimizer state to ensure consistent initialization hvd.broadcast_parameters(self.lightning_module.state_dict(), root_rank=0) for optimizer in optimizers: @@ -129,27 +133,29 @@ def _unpack_lightning_optimizer(opt): # Synchronization will be performed explicitly following backward() self._exit_stack.enter_context(optimizer.skip_synchronize()) - def barrier(self, *args, **kwargs): + def barrier(self, *args: Any, **kwargs: Any) -> None: if distributed_available(): self.join() - def broadcast(self, obj: object, src: int = 0) -> object: + def broadcast(self, obj: TBroadcast, src: int = 0) -> TBroadcast: obj = hvd.broadcast_object(obj, src) return obj - def model_to_device(self): + def model_to_device(self) -> None: if self.root_device.type == "cuda": # this can potentially be removed after #8312. Not done due to lack of horovod testing torch.cuda.set_device(self.root_device) + assert self.model is not None self.model.to(self.root_device) - def join(self): + def join(self) -> None: if self.root_device.type == "cuda": hvd.join(self.local_rank) else: hvd.join() - def reduce(self, tensor, group: Optional[Any] = None, reduce_op: Optional[Union[ReduceOp, str]] = "mean"): + def reduce(self, tensor: Union[Any, Tensor], group: Optional[Any] = None, + reduce_op: Optional[Union[ReduceOp, str]] = "mean") -> Union[Any, Tensor]: """Reduces a tensor from several distributed processes to one aggregated tensor. Args: @@ -196,6 +202,7 @@ def _wrap_optimizers( self, optimizers: List[Optimizer], accumulate_grad_batches: int ) -> List["hvd.DistributedOptimizer"]: """Wraps optimizers to perform gradient aggregation via allreduce.""" + assert self.lightning_module is not None return [ hvd.DistributedOptimizer( opt, diff --git a/src/pytorch_lightning/utilities/types.py b/src/pytorch_lightning/utilities/types.py index 0b10b5eebc7b1..5010c6ff052d2 100644 --- a/src/pytorch_lightning/utilities/types.py +++ b/src/pytorch_lightning/utilities/types.py @@ -65,6 +65,7 @@ def load_state_dict(self, state_dict: Dict[str, Any]) -> None: @runtime_checkable class _LRScheduler(_Stateful, Protocol): optimizer: Optimizer + base_lrs: List[float] def __init__(self, optimizer: Optimizer, *args: Any, **kwargs: Any) -> None: ... From 8cad6a59b371835f3c63e24bc5e6800a27bccf93 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 7 Jul 2022 20:09:40 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pytorch_lightning/strategies/horovod.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pytorch_lightning/strategies/horovod.py b/src/pytorch_lightning/strategies/horovod.py index 9df1667e6cb30..2cc1984afe1da 100644 --- a/src/pytorch_lightning/strategies/horovod.py +++ b/src/pytorch_lightning/strategies/horovod.py @@ -154,8 +154,12 @@ def join(self) -> None: else: hvd.join() - def reduce(self, tensor: Union[Any, Tensor], group: Optional[Any] = None, - reduce_op: Optional[Union[ReduceOp, str]] = "mean") -> Union[Any, Tensor]: + def reduce( + self, + tensor: Union[Any, Tensor], + group: Optional[Any] = None, + reduce_op: Optional[Union[ReduceOp, str]] = "mean", + ) -> Union[Any, Tensor]: """Reduces a tensor from several distributed processes to one aggregated tensor. Args: From ac94db56791dca09928b0f7d499ce2b840adf815 Mon Sep 17 00:00:00 2001 From: Cyprien-Ricque Date: Wed, 13 Jul 2022 19:35:52 +0200 Subject: [PATCH 3/5] add base_lrs in class definition --- src/pytorch_lightning/strategies/hivemind.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pytorch_lightning/strategies/hivemind.py b/src/pytorch_lightning/strategies/hivemind.py index d26fe33c85697..4f4d9ac8aca84 100644 --- a/src/pytorch_lightning/strategies/hivemind.py +++ b/src/pytorch_lightning/strategies/hivemind.py @@ -309,13 +309,13 @@ class HiveMindScheduler: This code ensures that we only step when the HiveMind optimizer reaches the global step. """ + base_lrs: List[float] def __init__(self, optimizer: "hivemind.Optimizer", scheduler: _LRScheduler) -> None: # copy most of the `Scheduler` methods into this instance. `__del__` is skipped in case the scheduler has # implemented custom logic which we would not want to call on destruction of the `HiveMindScheduler` self.__dict__ = {k: v for k, v in scheduler.__dict__.items() if k not in ("step", "__del__")} - self.base_lrs = scheduler.base_lrs self.optimizer = optimizer self.scheduler = scheduler self.current_step = -1 From cb183713d4371799a752f0fed74a83763f8a4c76 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 13 Jul 2022 17:37:25 +0000 Subject: [PATCH 4/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/pytorch_lightning/strategies/hivemind.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pytorch_lightning/strategies/hivemind.py b/src/pytorch_lightning/strategies/hivemind.py index 4f4d9ac8aca84..b274856bb6113 100644 --- a/src/pytorch_lightning/strategies/hivemind.py +++ b/src/pytorch_lightning/strategies/hivemind.py @@ -309,6 +309,7 @@ class HiveMindScheduler: This code ensures that we only step when the HiveMind optimizer reaches the global step. """ + base_lrs: List[float] def __init__(self, optimizer: "hivemind.Optimizer", scheduler: _LRScheduler) -> None: From 9a45a2791cc6a6519df69ef451eed5c3f6783ea6 Mon Sep 17 00:00:00 2001 From: Cyprien-Ricque Date: Wed, 13 Jul 2022 20:41:28 +0200 Subject: [PATCH 5/5] remove inaccurate assert message --- src/pytorch_lightning/strategies/horovod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pytorch_lightning/strategies/horovod.py b/src/pytorch_lightning/strategies/horovod.py index 2cc1984afe1da..19075cbbb0b05 100644 --- a/src/pytorch_lightning/strategies/horovod.py +++ b/src/pytorch_lightning/strategies/horovod.py @@ -113,7 +113,7 @@ def _unpack_lightning_optimizer(opt: Optimizer) -> Optimizer: lr_scheduler_configs = self.lr_scheduler_configs for config in lr_scheduler_configs: scheduler = config.scheduler - assert isinstance(scheduler, _LRScheduler), "ReduceLROnPlateau does not have attribute 'base_lr'" + assert isinstance(scheduler, _LRScheduler) scheduler.base_lrs = [lr * self.world_size for lr in scheduler.base_lrs] assert self.lightning_module is not None