From 0353be6afa6559ded30da43ebc6cd6edb553a0a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Mon, 1 Mar 2021 20:52:49 +0100 Subject: [PATCH 01/11] fix duplicate console logging bug / properly configure logging (#5509) * duplicate logs * remove unused * configure basic logging * missing import * update docs for logging * import order * flake8 * test * fix test * flake8 * import warning * add changelog * isort * stderr Co-authored-by: chaton --- CHANGELOG.md | 3 + docs/source/extensions/logging.rst | 12 +- .../computer_vision_fine_tuning.py | 3 +- pytorch_lightning/__init__.py | 18 +- .../callbacks/model_checkpoint.py | 4 +- pytorch_lightning/core/lightning.py | 3 +- pytorch_lightning/core/saving.py | 3 +- pytorch_lightning/loggers/comet.py | 3 +- pytorch_lightning/loggers/csv_logs.py | 4 +- pytorch_lightning/loggers/mlflow.py | 4 +- pytorch_lightning/loggers/neptune.py | 3 +- pytorch_lightning/loggers/tensorboard.py | 4 +- pytorch_lightning/plugins/ddp_plugin.py | 170 ++++++++ .../plugins/ddp_sequential_plugin.py | 408 ++++++++++++++++++ pytorch_lightning/profiler/profilers.py | 4 +- .../trainer/connectors/slurm_connector.py | 4 +- pytorch_lightning/trainer/trainer.py | 2 +- pytorch_lightning/trainer/training_tricks.py | 3 +- pytorch_lightning/tuner/batch_size_scaling.py | 2 + pytorch_lightning/tuner/lr_finder.py | 4 +- pytorch_lightning/utilities/distributed.py | 3 +- pytorch_lightning/utilities/seed.py | 4 +- .../utilities/upgrade_checkpoint.py | 4 +- tests/__init__.py | 4 + tests/callbacks/test_early_stopping.py | 5 +- tests/checkpointing/test_model_checkpoint.py | 3 +- tests/test_profiler.py | 5 +- 27 files changed, 656 insertions(+), 33 deletions(-) create mode 100644 pytorch_lightning/plugins/ddp_plugin.py create mode 100644 pytorch_lightning/plugins/ddp_sequential_plugin.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cf9b731c27fd..35adc1b04aa47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -289,6 +289,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Fixed `val_check_interval` with `fast_dev_run` ([#5540](https://github.com/PyTorchLightning/pytorch-lightning/pull/5540)) +- Fixed duplicate logs appearing in console when using the python logging module ([#5509](https://github.com/PyTorchLightning/pytorch-lightning/pull/5509)) + + ## [1.1.4] - 2021-01-12 ### Added diff --git a/docs/source/extensions/logging.rst b/docs/source/extensions/logging.rst index 026040f03a330..92304255ea901 100644 --- a/docs/source/extensions/logging.rst +++ b/docs/source/extensions/logging.rst @@ -259,13 +259,19 @@ Configure console logging ************************* Lightning logs useful information about the training process and user warnings to the console. -You can retrieve the Lightning logger and change it to your liking. For example, increase the logging level -to see fewer messages like so: +You can retrieve the Lightning logger and change it to your liking. For example, adjust the logging level +or redirect output for certain modules to log files: .. code-block:: python import logging - logging.getLogger("lightning").setLevel(logging.ERROR) + + # configure logging at the root level of lightning + logging.getLogger("pytorch_lightning").setLevel(logging.ERROR) + + # configure logging on module level, redirect to file + logger = logging.getLogger("pytorch_lightning.core") + logger.addHandler(logging.FileHandler("core.log")) Read more about custom Python logging `here `_. diff --git a/pl_examples/domain_templates/computer_vision_fine_tuning.py b/pl_examples/domain_templates/computer_vision_fine_tuning.py index 65bf1bde141fa..823efaa53a5e5 100644 --- a/pl_examples/domain_templates/computer_vision_fine_tuning.py +++ b/pl_examples/domain_templates/computer_vision_fine_tuning.py @@ -38,6 +38,7 @@ See: https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html """ import argparse +import logging import os from pathlib import Path from typing import Union @@ -54,11 +55,11 @@ import pytorch_lightning as pl from pl_examples import cli_lightning_logo -from pytorch_lightning import _logger as log from pytorch_lightning import LightningDataModule from pytorch_lightning.callbacks.finetuning import BaseFinetuning from pytorch_lightning.utilities import rank_zero_info +log = logging.getLogger(__name__) DATA_URL = "https://storage.googleapis.com/mledu-datasets/cats_and_dogs_filtered.zip" # --- Finetuning Callback --- diff --git a/pytorch_lightning/__init__.py b/pytorch_lightning/__init__.py index 43c0837e13934..e52c42b8daf58 100644 --- a/pytorch_lightning/__init__.py +++ b/pytorch_lightning/__init__.py @@ -1,7 +1,8 @@ """Root package info.""" -import logging as python_logging +import logging import os +import sys import time _this_year = time.strftime("%Y") @@ -37,10 +38,15 @@ - https://pytorch-lightning.readthedocs.io/en/latest - https://pytorch-lightning.readthedocs.io/en/stable """ +_root_logger = logging.getLogger() +_logger = logging.getLogger(__name__) +_logger.setLevel(logging.INFO) + +# if root logger has handlers, propagate messages up and let root logger process them +if not _root_logger.hasHandlers(): + _logger.addHandler(logging.StreamHandler()) + _logger.propagate = False -_logger = python_logging.getLogger("lightning") -_logger.addHandler(python_logging.StreamHandler()) -_logger.setLevel(python_logging.INFO) _PACKAGE_ROOT = os.path.dirname(__file__) _PROJECT_ROOT = os.path.dirname(_PACKAGE_ROOT) @@ -53,9 +59,7 @@ except NameError: __LIGHTNING_SETUP__: bool = False -if __LIGHTNING_SETUP__: - import sys # pragma: no-cover - +if __LIGHTNING_SETUP__: # pragma: no-cover sys.stdout.write(f'Partial import of `{__name__}` during the build process.\n') # pragma: no-cover # We are not importing the rest of the lightning during the build process, as it may not be compiled yet else: diff --git a/pytorch_lightning/callbacks/model_checkpoint.py b/pytorch_lightning/callbacks/model_checkpoint.py index 54ad16f7b686f..ec735eb5363a2 100644 --- a/pytorch_lightning/callbacks/model_checkpoint.py +++ b/pytorch_lightning/callbacks/model_checkpoint.py @@ -18,7 +18,7 @@ Automatically save model checkpoints during training. """ - +import logging import os import re from copy import deepcopy @@ -29,13 +29,13 @@ import torch import yaml -from pytorch_lightning import _logger as log from pytorch_lightning.callbacks.base import Callback from pytorch_lightning.utilities import rank_zero_info, rank_zero_only, rank_zero_warn from pytorch_lightning.utilities.cloud_io import get_filesystem from pytorch_lightning.utilities.exceptions import MisconfigurationException from pytorch_lightning.utilities.warnings import WarningCache +log = logging.getLogger(__name__) warning_cache = WarningCache() diff --git a/pytorch_lightning/core/lightning.py b/pytorch_lightning/core/lightning.py index 52bcc213692ac..32ba1fe8116cf 100644 --- a/pytorch_lightning/core/lightning.py +++ b/pytorch_lightning/core/lightning.py @@ -16,6 +16,7 @@ import collections import copy import inspect +import logging import os import tempfile import uuid @@ -30,7 +31,6 @@ from torch.nn import Module from torch.optim.optimizer import Optimizer -from pytorch_lightning import _logger as log from pytorch_lightning.core.grads import GradInformation from pytorch_lightning.core.hooks import CheckpointHooks, DataHooks, ModelHooks from pytorch_lightning.core.memory import ModelSummary @@ -45,6 +45,7 @@ if TYPE_CHECKING: from pytorch_lightning.trainer.states import RunningStage +log = logging.getLogger(__name__) class LightningModule( diff --git a/pytorch_lightning/core/saving.py b/pytorch_lightning/core/saving.py index 2b470f43eaf3d..280eca55260a7 100644 --- a/pytorch_lightning/core/saving.py +++ b/pytorch_lightning/core/saving.py @@ -15,6 +15,7 @@ import ast import csv import inspect +import logging import os from argparse import Namespace from copy import deepcopy @@ -25,13 +26,13 @@ import torch import yaml -from pytorch_lightning import _logger as log from pytorch_lightning.utilities import _OMEGACONF_AVAILABLE, AttributeDict, rank_zero_warn from pytorch_lightning.utilities.apply_func import apply_to_collection from pytorch_lightning.utilities.cloud_io import get_filesystem from pytorch_lightning.utilities.cloud_io import load as pl_load from pytorch_lightning.utilities.parsing import parse_class_init_keys +log = logging.getLogger(__name__) PRIMITIVE_TYPES = (bool, int, float, str) ALLOWED_CONFIG_TYPES = (AttributeDict, MutableMapping, Namespace) diff --git a/pytorch_lightning/loggers/comet.py b/pytorch_lightning/loggers/comet.py index 31c768fa5f37b..148e512f5e439 100644 --- a/pytorch_lightning/loggers/comet.py +++ b/pytorch_lightning/loggers/comet.py @@ -16,6 +16,7 @@ ------------ """ +import logging import os from argparse import Namespace from typing import Any, Dict, Optional, Union @@ -23,12 +24,12 @@ import torch from torch import is_tensor -from pytorch_lightning import _logger as log from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.loggers.base import LightningLoggerBase, rank_zero_experiment from pytorch_lightning.utilities import _module_available, rank_zero_only from pytorch_lightning.utilities.exceptions import MisconfigurationException +log = logging.getLogger(__name__) _COMET_AVAILABLE = _module_available("comet_ml") if _COMET_AVAILABLE: diff --git a/pytorch_lightning/loggers/csv_logs.py b/pytorch_lightning/loggers/csv_logs.py index a78440143167b..4df672fa6e3b5 100644 --- a/pytorch_lightning/loggers/csv_logs.py +++ b/pytorch_lightning/loggers/csv_logs.py @@ -20,17 +20,19 @@ """ import csv import io +import logging import os from argparse import Namespace from typing import Any, Dict, Optional, Union import torch -from pytorch_lightning import _logger as log from pytorch_lightning.core.saving import save_hparams_to_yaml from pytorch_lightning.loggers.base import LightningLoggerBase, rank_zero_experiment from pytorch_lightning.utilities.distributed import rank_zero_only, rank_zero_warn +log = logging.getLogger(__name__) + class ExperimentWriter(object): r""" diff --git a/pytorch_lightning/loggers/mlflow.py b/pytorch_lightning/loggers/mlflow.py index 4aa4c67b576ec..8754842bc4a22 100644 --- a/pytorch_lightning/loggers/mlflow.py +++ b/pytorch_lightning/loggers/mlflow.py @@ -15,17 +15,17 @@ MLflow Logger ------------- """ +import logging import re from argparse import Namespace from time import time from typing import Any, Dict, Optional, Union -from pytorch_lightning import _logger as log from pytorch_lightning.loggers.base import LightningLoggerBase, rank_zero_experiment from pytorch_lightning.utilities import _module_available, rank_zero_only, rank_zero_warn +log = logging.getLogger(__name__) LOCAL_FILE_URI_PREFIX = "file:" - _MLFLOW_AVAILABLE = _module_available("mlflow") try: import mlflow diff --git a/pytorch_lightning/loggers/neptune.py b/pytorch_lightning/loggers/neptune.py index d4f24567cab6a..da37d135ff7c9 100644 --- a/pytorch_lightning/loggers/neptune.py +++ b/pytorch_lightning/loggers/neptune.py @@ -16,15 +16,16 @@ -------------- """ from argparse import Namespace +import logging from typing import Any, Dict, Iterable, Optional, Union import torch from torch import is_tensor -from pytorch_lightning import _logger as log from pytorch_lightning.loggers.base import LightningLoggerBase, rank_zero_experiment from pytorch_lightning.utilities import _module_available, rank_zero_only +log = logging.getLogger(__name__) _NEPTUNE_AVAILABLE = _module_available("neptune") if _NEPTUNE_AVAILABLE: diff --git a/pytorch_lightning/loggers/tensorboard.py b/pytorch_lightning/loggers/tensorboard.py index 0485868fa2ef1..72d1731f80ec5 100644 --- a/pytorch_lightning/loggers/tensorboard.py +++ b/pytorch_lightning/loggers/tensorboard.py @@ -16,6 +16,7 @@ ------------------ """ +import logging import os from argparse import Namespace from typing import Any, Dict, Optional, Union @@ -24,13 +25,14 @@ from torch.utils.tensorboard import SummaryWriter from torch.utils.tensorboard.summary import hparams -from pytorch_lightning import _logger as log from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.core.saving import save_hparams_to_yaml from pytorch_lightning.loggers.base import LightningLoggerBase, rank_zero_experiment from pytorch_lightning.utilities import _OMEGACONF_AVAILABLE, rank_zero_only, rank_zero_warn from pytorch_lightning.utilities.cloud_io import get_filesystem +log = logging.getLogger(__name__) + if _OMEGACONF_AVAILABLE: from omegaconf import Container, OmegaConf diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py new file mode 100644 index 0000000000000..6d5ad1e9e2119 --- /dev/null +++ b/pytorch_lightning/plugins/ddp_plugin.py @@ -0,0 +1,170 @@ +import os +from contextlib import contextmanager +from typing import Any, Dict, List, Union + +import torch +import torch.distributed as torch_distrib +from torch.optim import Optimizer + +from pytorch_lightning import _logger as log +from pytorch_lightning.core.lightning import LightningModule +from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel +from pytorch_lightning.plugins.plugin import LightningPlugin + + +class DDPPlugin(LightningPlugin): + """ + Plugin to link a custom ddp implementation to any arbitrary accelerator. + + This plugin forwards all constructor arguments to `LightningDistributedDataParallel`, + which in turn forwards all args to `DistributedDataParallel`. + + Example:: + + class MyDDP(DDPPlugin): + + def configure_ddp(self, model, device_ids): + model = MyDDPWrapper(model, device_ids) + return model + + my_ddp = MyDDP() + trainer = Trainer(accelerator='ddp_x', plugins=[my_ddp]) + """ + + def __init__(self, **kwargs): + self._ddp_kwargs: Dict[str, Any] = kwargs + + def configure_ddp( + self, model: LightningModule, device_ids: List[int] + ) -> LightningDistributedDataParallel: + """ + Pass through all customizations from constructor to `LightningDistributedDataParallel`. + Override to define a custom DDP implementation. + + .. note:: Only requirement is that your DDP implementation subclasses LightningDistributedDataParallel + + + The default implementation is:: + + def configure_ddp(self, model, device_ids): + model = LightningDistributedDataParallel( + model, device_ids=device_ids, find_unused_parameters=False + ) + return model + + Args: + model: the lightningModule + device_ids: the list of devices available + + Returns: + the model wrapped in LightningDistributedDataParallel + + """ + # if unset, default `find_unused_parameters` `False` + self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get( + "find_unused_parameters", False + ) + model = LightningDistributedDataParallel( + model, + device_ids=device_ids, + **self._ddp_kwargs, + ) + return model + + def init_ddp_connection( + self, + trainer, + cluster_environment, + global_rank: int, + world_size: int, + is_slurm_managing_tasks: bool = True, + ) -> None: + os.environ["MASTER_ADDR"] = str(cluster_environment.master_address()) + os.environ["MASTER_PORT"] = str(cluster_environment.master_port()) + os.environ["WORLD_SIZE"] = str(cluster_environment.world_size()) + torch_backend = "nccl" if trainer.on_gpu else "gloo" + + if not torch_distrib.is_initialized(): + log.info( + f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}" + ) + torch_distrib.init_process_group( + torch_backend, rank=global_rank, world_size=world_size + ) + + @property + def is_running_single_process_per_device(self) -> bool: + # objects do not need to be scattered in single process per device, move objects upfront to device + # This property is used in ``self.on_before_forward`` function. + return self.device_ids is not None and len(self.device_ids) == 1 + + def on_before_forward(self, model: LightningModule, *args): + """ + Override to handle custom edge case. + + Args: + args: Inputs to the model. + model: Model to train. + Returns: args moved to correct device if needed. + """ + if self.is_running_single_process_per_device: + args = model.transfer_batch_to_device(args, model.device) + return args + + def optimizer_state(self, optimizer: Optimizer) -> dict: + return optimizer.state_dict() + + def on_after_setup_optimizers(self, trainer): + """ + Called after optimizers have been set-up. This is useful for doing any configuration options in RPC, or + state sharding. + """ + + def get_model_from_plugin( + self, + model: Union[LightningDistributedDataParallel, LightningModule] + ) -> LightningModule: + """ + Override to modify returning base :class:`LightningModule` + when accessing variable and functions outside of the parallel wrapper. + + Example:: + ref_model = ddp_plugin.get_model_from_plugin(model) + ref_model.training_step(...) + + Args: + model: Model with parallel wrapper. + + Returns: Reference :class:`LightningModule` within parallel wrapper. + + """ + if isinstance(model, LightningDistributedDataParallel): + return model.module + return model + + @contextmanager + def block_backward_sync(self, model: LightningDistributedDataParallel): + """ + Blocks ddp sync gradients behaviour on backwards pass. + This is useful for skipping sync when accumulating gradients, reducing communication overhead + Returns: context manager with sync behaviour off + """ + yield model.no_sync() + + def on_before_manual_backward(self, model: LightningDistributedDataParallel, output: Any): + model.reducer_prepare_for_backwards(output) + + def on_after_manual_backward(self, model: LightningDistributedDataParallel): + model.reducer_reset_hooks() + + def distributed_sampler_kwargs(self, distributed_sampler_kwargs): + return distributed_sampler_kwargs + + @property + def data_parallel_group(self): + """ + Return the group that this process exists in. By default, this is the world size. + Useful for when additional parallel groups have been created, to select certain processes. + Returns: The ProcessGroup this process exists in. + """ + return torch_distrib.group.WORLD diff --git a/pytorch_lightning/plugins/ddp_sequential_plugin.py b/pytorch_lightning/plugins/ddp_sequential_plugin.py new file mode 100644 index 0000000000000..069b1754fbce0 --- /dev/null +++ b/pytorch_lightning/plugins/ddp_sequential_plugin.py @@ -0,0 +1,408 @@ +# Copyright The PyTorch Lightning team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License +import os +from typing import Any, List, Optional + +import torch +from torch import nn +import torch.distributed as torch_distrib +from torch.nn.parallel import DistributedDataParallel + +from pytorch_lightning import _logger as log +from pytorch_lightning import LightningModule +from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel +from pytorch_lightning.plugins.rpc_plugin import RPCPlugin +from pytorch_lightning.utilities import FAIRSCALE_PIPE_AVAILABLE, rank_zero_only +from pytorch_lightning.utilities.exceptions import MisconfigurationException + +if FAIRSCALE_PIPE_AVAILABLE: + from fairscale.nn import PipeRPCWrapper + import fairscale.nn.model_parallel as mpu + from fairscale.nn.pipe import balance as pipe_balance + from fairscale.nn.pipe import rpc as rpc_pipe + from fairscale.nn.pipe.pipeline import PipelineStyle + + +class DDPSequentialPlugin(RPCPlugin): + def __init__( + self, + balance: Optional[List[int]] = None, + microbatches: int = 8, + checkpoint: str = 'except_last', + balance_mode: str = "balance_by_size", + pipelined_backward: Optional[bool] = True, + **kwargs): + """ + Provides sequential model parallelism for :class:`nn.Sequential ` module. + If the module requires lots of memory, Pipe can be used to reduce this by leveraging multiple GPUs. + + Example:: + class MyLightningModule: + def __init__(self): + ... + model.sequential_module = torch.nn.Sequential(my_layers) + + # Split my module across 4 gpus, one layer each + model = MyLightningModule() + plugin = DDPSequentialPlugin(balance=[1, 1, 1, 1]) + trainer = Trainer(accelerator='ddp', gpus=4, plugins=[plugin]) + trainer.fit(model) + + .. _DDPSequentialPlugin: https://arxiv.org/abs/1811.06965 + + Pipeline parallelism comes with with checkpointing to reduce peak + memory required to train while minimizing device under-utilization. + This is turned on by default and can be turned off via the checkpoint argument. + + You should determine the balance when defining the plugin, + or you can pass an example input array via the LightningModule to infer a balance. + The module will be partitioned into multiple devices according to the given balance. You may also rely on + your own heuristics to find your own optimal configuration. + + Args: + balance: The balance of the model, i.e [2, 2] (two layers on each GPU). + If not provided assumes user provides an input example array to find a balance on all GPUs. + + microbatches: Allows for parallelization to reduce device utilization + by splitting the batch into further smaller batches. + + checkpoint: Enables gradient checkpointing. ['always', 'except_last', 'never'] + + balance_mode: Type of balance heuristic to use if balance to be inferred. + + - 'balance_by_size': checks memory usage of each layer and determines balance + + - 'balance_by_time': checks time of each layer and determines balance + + pipelined_backward: if True, call torch.autograd.backward once per microbatch on the + + backward pass (instead of once for the whole batch). This works + around a potential deadlock in pytorch when using tensor parallelism + at the same time. Defaults to `True` if + `get_model_parallel_world_size() > 1` + """ + self._check_pipe_available() + super().__init__(**kwargs) + + self.balance = balance + + self.microbatches = microbatches + self.checkpoint = checkpoint + self.balance_mode = balance_mode + self.pipelined_backward = pipelined_backward + self.main_rpc_process = False # Updated by main process, default for all secondary processes + + def init_ddp_connection( + self, + trainer, + cluster_environment, + global_rank: int, + world_size: int, + is_slurm_managing_tasks: bool = True, + ) -> None: + trainer.prepared_for_backwards = False + self._check_arguments(trainer) + if self._skip_init_connections(trainer): + return + super().init_ddp_connection( + trainer=trainer, + cluster_environment=cluster_environment, + global_rank=global_rank, + world_size=world_size, + is_slurm_managing_tasks=is_slurm_managing_tasks + ) + super().init_rpc_connection( + global_rank=global_rank, + world_size=world_size + ) + model = trainer.get_model() + self.gpus_per_model = self._infer_check_num_gpus(trainer) + self.init_model_parallel_groups(trainer) + self.set_main_rpc_process() + + self._check_sequential_model_exists(model) + if self.main_rpc_process: + if self.balance is None: + self._infer_model_balance(trainer) + self._assert_valid_model_balance(trainer) + + def on_before_manual_backward(self, model: LightningDistributedDataParallel, output: Any): + pass + + def _infer_model_balance(self, trainer): + log.info(f'Inferring model balance using {self.balance_mode} mode') + model = trainer.get_model() + if model.example_input_array is None: + raise MisconfigurationException( + 'Please set example_input_array to your model, so we can infer the right model balance for you') + balance_func = getattr(pipe_balance, self.balance_mode) + self.balance = balance_func(self.gpus_per_model, model.sequential_module, model.example_input_array) + self._sync_balance_to_all_parallel_groups() + + log.info(f'The following model balance {self.balance.tolist()} was inferred using {self.balance_mode} mode') + + def _sync_balance_to_all_parallel_groups(self, main_rank=0): + """ + Ensures that we sync the balance to all main processes, so that the balance is the same per replica. + Args: + main_rank: The rank with the balance we'd like to replicate. + """ + self.balance = torch.tensor(self.balance, dtype=torch.int, device='cuda') + # Ensure we sync to all processes within the main data parallel group + # We use the data parallel group as all main processes are found within the same group + torch_distrib.broadcast(self.balance, src=main_rank, group=mpu.get_data_parallel_group()) + self.balance = self.balance.cpu() + + def _check_sequential_model_exists(self, model): + if not hasattr(model, "sequential_module") or not isinstance(model.sequential_module, nn.Sequential): + raise MisconfigurationException( + 'Could not find a PipeLightningModule within the model. ' + 'Did you set your sequential model as the `sequential_module` attribute of your model?') + + def _find_and_init_pipe_module(self, model): + if hasattr(model, "sequential_module") and isinstance(model.sequential_module, LightningPipeModule): + # model has been wrapped already + return + elif hasattr(model, "sequential_module") and isinstance(model.sequential_module, nn.Sequential): + # try to wrap model for the user + model.sequential_module = LightningPipeModule( + model.sequential_module, + balance=self.balance, + microbatches=self.microbatches, + checkpoint=self.checkpoint, + ) + # Update references for workers to access correct lightning functions when calling RPC + model.sequential_module.trainer = model.trainer + model.sequential_module.configure_optimizers = model.configure_optimizers + + # Update references for main process to access correct lightning functions when calling RPC + model.sequential_module.module.model.trainer = model.trainer + model.sequential_module.module.model.configure_optimizers = model.configure_optimizers + + else: + raise MisconfigurationException( + 'Could not find a PipeLightningModule within the model. ' + 'Did you defined set your sequential model as an `sequential_module` attribute of your model ?' + ) + + def _assert_valid_model_balance(self, trainer): + model = trainer.get_model() + if sum(self.balance) != len(model.sequential_module): + raise MisconfigurationException( + f'The provided balance sum: {sum(self.balance)} does not' + f' match your Sequential length: {len(model.sequential_module)}') + + def _skip_init_connections(self, trainer): + """ + Skip initialization if torch is already initialized and we're in testing. + Returns: Whether to skip initialization + + """ + return torch_distrib.is_initialized() and trainer.testing + + def init_model_parallel_groups(self, trainer): + num_model_parallel = 1 # TODO currently no support for vertical model parallel + mpu.initialize_model_parallel( + model_parallel_size_=num_model_parallel, + pipeline_length=self.gpus_per_model + ) + + def _infer_check_num_gpus(self, trainer): + """ + Infer the number of GPUs per model. + + Args: + trainer: The trainer object. + + Returns: The appropriate balance for the model + """ + if isinstance(self.balance, list): + if len(self.balance) != (trainer.world_size / trainer.num_nodes): + raise MisconfigurationException( + "Pipe currently only supports splitting the module onto all available GPUs" + ) + # User has defined a balance for his model + return len(self.balance) + # Assume that the user wants to balance his model on all GPUs + return trainer.world_size + + def on_accelerator_exit_rpc_process(self, trainer) -> None: + if not trainer.testing: + torch_distrib.barrier() # Ensure we await main process initialization + + # Add trainer/configure_optimizers to the pipe model for access in all worker processes + rpc_pipe.PipeModel.trainer = trainer + del rpc_pipe.PipeModel.trainer.model.sequential_module + rpc_pipe.PipeModel.trainer.model.sequential_module = rpc_pipe.PipeModel + rpc_pipe.PipeModel.configure_optimizers = trainer.model.configure_optimizers + super().on_accelerator_exit_rpc_process(trainer) + + def set_main_rpc_process(self): + self.main_rpc_process = torch_distrib.get_rank(group=mpu.get_pipeline_parallel_group()) == 0 + + def on_main_rpc_connection(self, trainer) -> None: + # Create pipe_module + model = trainer.get_model() + self._find_and_init_pipe_module(model) + if not trainer.testing: + torch_distrib.barrier() # Ensure we join main process initialization + model.sequential_module.foreach_worker(register_optimizers, include_self=True) + + def _check_arguments(self, trainer): + if trainer.amp_backend is not None: + raise MisconfigurationException( + 'DDPSequentialPlugin is currently not supported in Automatic Mixed Precision') + + def configure_ddp( + self, + model: LightningModule, device_ids: List[int]) -> DistributedDataParallel: + ddp_plugin = RPCPlugin(process_group=mpu.get_data_parallel_group()).configure_ddp(model, device_ids) + # Plugin handle backwards across processes. Currently not supported for DDP + pipe parallel + ddp_plugin.PREPARE_FOR_BACKWARDS = False + return ddp_plugin + + @rank_zero_only + def rpc_save_model( + self, + save_model_fn, + last_filepath, + trainer, + pl_module) -> None: + model = trainer.get_model() + if not hasattr(model.sequential_module, "foreach_worker"): + return + current_layers = pl_module.sequential_module + model.sequential_module.foreach_worker( + save_layers_on_all_rank_zero_workers, + {"gpus_per_model": self.gpus_per_model}, + include_self=True + ) + pl_module.sequential_module = load_sequential_from_saved_layers(self.gpus_per_model) + save_model_fn(last_filepath, trainer, pl_module) + pl_module.sequential_module = current_layers + + def worker_optimizer_step( + self, + model: LightningModule, + opt_idx: int, + *args, + **kwargs) -> None: + model.sequential_module.foreach_worker( + run_optimizer, + {"opt_idx": opt_idx, "args": args, "kwargs": kwargs}, + include_self=False + ) + + def distributed_sampler_kwargs(self, distributed_sampler_kwargs): + return dict( + num_replicas=mpu.get_data_parallel_world_size(), + rank=mpu.get_data_parallel_rank(), + ) + + @property + def data_parallel_group(self): + return mpu.get_data_parallel_group() + + @property + def is_main_rpc_process(self) -> bool: + return self.main_rpc_process + + @property + def return_after_exit_rpc_process(self) -> bool: + return True + + def barrier(self, name: Optional[str] = None) -> None: + if torch_distrib.is_initialized() and self.is_main_rpc_process: + torch_distrib.barrier(group=self.data_parallel_group) + + def _check_pipe_available(self): + if not FAIRSCALE_PIPE_AVAILABLE: + raise MisconfigurationException( + 'PipeRPCPlugin requires FairScale and currently is only supported on PyTorch 1.6.' + ) + + +class LightningPipeModule(nn.Module): + """ + This class wraps Fairscale Pipe and PipeRCPWrapper class. + """ + + def __init__( + self, + module: nn.Sequential, + balance: List[int], + microbatches: int = 8, + checkpoint='never'): + super().__init__() + self.module = module + self.balance = balance + self.microbatches = microbatches + self.checkpoint = checkpoint + self._init_pipe() + + def _init_pipe(self): + device = torch.device("cuda", torch_distrib.get_rank()) + + self.module = PipeRPCWrapper( + module=self.module, + balance=self.balance, + chunks=self.microbatches, + style=PipelineStyle.MultiProcess, + input_device=device, + worker_map=self.get_worker_map(), + checkpoint=self.checkpoint, + ) + + def foreach_worker(self, *args, **kwargs): + self.module.foreach_worker(*args, **kwargs) + + def forward(self, *args, **kwargs): + return self.module(*args, **kwargs) + + def get_worker_map(self): + # TODO, is this correct with multinodes? We also assume "worker" is the same as defined in the RPCPlugin + return {rank: f"worker{rank}" for rank in range(torch_distrib.get_world_size())} + + +def register_optimizers(ctx, model): + optimizers, lr_schedulers, optimizer_frequencies = model.trainer.init_optimizers(model) + model.trainer.optimizers = optimizers + model.trainer.lr_schedulers = lr_schedulers + model.trainer.optimizer_frequencies = optimizer_frequencies + + +def run_optimizer(ctx, model): + trainer = model.trainer + opt_idx = ctx["opt_idx"] + optimizer = trainer.optimizers[opt_idx] + optimizer.step(*ctx["args"], **ctx["kwargs"]) + + +def save_layers_on_all_rank_zero_workers(ctx, model): + gpus_per_model = ctx["gpus_per_model"] + rank = torch_distrib.get_rank() + if rank in range(gpus_per_model): + seq = list(model.children())[0] + torch.save(seq, f"seq_{rank}.pt") + + +def load_sequential_from_saved_layers(gpus_per_model): + partial_seqs = [torch.load(f"seq_{rank}.pt", map_location='cpu') for rank in range(gpus_per_model)] + seq = nn.Sequential() + for p_seq in partial_seqs: + for name, child in p_seq.named_children(): + seq.add_module(name, child) + # delete tmp files + [os.remove(f"seq_{rank}.pt") for rank in range(gpus_per_model)] + return seq diff --git a/pytorch_lightning/profiler/profilers.py b/pytorch_lightning/profiler/profilers.py index 1ec9515140e49..3fe796e7e06de 100644 --- a/pytorch_lightning/profiler/profilers.py +++ b/pytorch_lightning/profiler/profilers.py @@ -16,6 +16,7 @@ import cProfile import inspect import io +import logging import os import pstats import time @@ -27,12 +28,13 @@ import numpy as np import torch -from pytorch_lightning import _logger as log from pytorch_lightning.utilities import rank_zero_only from pytorch_lightning.utilities.cloud_io import get_filesystem from pytorch_lightning.utilities.distributed import rank_zero_warn from pytorch_lightning.utilities.exceptions import MisconfigurationException +log = logging.getLogger(__name__) + class BaseProfiler(ABC): """ diff --git a/pytorch_lightning/trainer/connectors/slurm_connector.py b/pytorch_lightning/trainer/connectors/slurm_connector.py index 5086bab25593a..595831d49a992 100644 --- a/pytorch_lightning/trainer/connectors/slurm_connector.py +++ b/pytorch_lightning/trainer/connectors/slurm_connector.py @@ -1,8 +1,10 @@ +import logging import os import signal from subprocess import call -from pytorch_lightning import _logger as log + +log = logging.getLogger(__name__) class SLURMConnector: diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index 68453811da203..ff72732d134c6 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -20,7 +20,6 @@ import torch from torch.utils.data import DataLoader -from pytorch_lightning import _logger as log from pytorch_lightning.accelerators import Accelerator from pytorch_lightning.callbacks import Callback from pytorch_lightning.core.datamodule import LightningDataModule @@ -63,6 +62,7 @@ from pytorch_lightning.utilities.memory import recursive_detach from pytorch_lightning.utilities.model_helpers import is_overridden +log = logging.getLogger(__name__) # warnings to ignore in trainer warnings.filterwarnings( 'ignore', message='torch.distributed.reduce_op is deprecated, ' diff --git a/pytorch_lightning/trainer/training_tricks.py b/pytorch_lightning/trainer/training_tricks.py index 6b388f7137ce1..9b3432c21c5f4 100644 --- a/pytorch_lightning/trainer/training_tricks.py +++ b/pytorch_lightning/trainer/training_tricks.py @@ -13,15 +13,16 @@ # limitations under the License. from abc import ABC +import logging import torch from torch import Tensor -from pytorch_lightning import _logger as log from pytorch_lightning.core.lightning import LightningModule EPSILON = 1e-6 EPSILON_FP16 = 1e-5 +log = logging.getLogger(__name__) class TrainerTrainingTricksMixin(ABC): diff --git a/pytorch_lightning/tuner/batch_size_scaling.py b/pytorch_lightning/tuner/batch_size_scaling.py index c29cffc42607b..a237a9049986b 100644 --- a/pytorch_lightning/tuner/batch_size_scaling.py +++ b/pytorch_lightning/tuner/batch_size_scaling.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License +import logging import os from typing import Optional, Tuple @@ -24,6 +25,7 @@ from pytorch_lightning.utilities.memory import garbage_collection_cuda, is_oom_error from pytorch_lightning.utilities.parsing import lightning_getattr, lightning_hasattr, lightning_setattr +log = logging.getLogger(__name__) def scale_batch_size( trainer, diff --git a/pytorch_lightning/tuner/lr_finder.py b/pytorch_lightning/tuner/lr_finder.py index cf29799a05a5b..3300deb2d002e 100644 --- a/pytorch_lightning/tuner/lr_finder.py +++ b/pytorch_lightning/tuner/lr_finder.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import importlib +import logging import os from functools import wraps from typing import Callable, List, Optional, Sequence, Union @@ -22,7 +23,6 @@ from torch.optim.lr_scheduler import _LRScheduler from torch.utils.data import DataLoader -from pytorch_lightning import _logger as log from pytorch_lightning.callbacks import Callback from pytorch_lightning.core.datamodule import LightningDataModule from pytorch_lightning.core.lightning import LightningModule @@ -39,6 +39,8 @@ else: from tqdm import tqdm +log = logging.getLogger(__name__) + def _determine_lr_attr_name(trainer, model: LightningModule) -> str: if isinstance(trainer.auto_lr_find, str): diff --git a/pytorch_lightning/utilities/distributed.py b/pytorch_lightning/utilities/distributed.py index f283497e5e5a1..f6547f0b05d96 100644 --- a/pytorch_lightning/utilities/distributed.py +++ b/pytorch_lightning/utilities/distributed.py @@ -19,7 +19,8 @@ import torch -from pytorch_lightning import _logger as log +import logging +log = logging.getLogger(__name__) if torch.distributed.is_available(): from torch.distributed import group, ReduceOp diff --git a/pytorch_lightning/utilities/seed.py b/pytorch_lightning/utilities/seed.py index da98e00b71e60..8129075f99f4d 100644 --- a/pytorch_lightning/utilities/seed.py +++ b/pytorch_lightning/utilities/seed.py @@ -13,6 +13,7 @@ # limitations under the License. """Helper functions to help with reproducibility of models. """ +import logging import os import random from typing import Optional @@ -20,9 +21,10 @@ import numpy as np import torch -from pytorch_lightning import _logger as log from pytorch_lightning.utilities import rank_zero_warn +log = logging.getLogger(__name__) + def seed_everything(seed: Optional[int] = None) -> int: """ diff --git a/pytorch_lightning/utilities/upgrade_checkpoint.py b/pytorch_lightning/utilities/upgrade_checkpoint.py index 2e767542cd9bd..4896845f10263 100644 --- a/pytorch_lightning/utilities/upgrade_checkpoint.py +++ b/pytorch_lightning/utilities/upgrade_checkpoint.py @@ -12,11 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import argparse +import logging from shutil import copyfile import torch -from pytorch_lightning import _logger as log from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint KEYS_MAPPING = { @@ -27,6 +27,8 @@ "early_stop_callback_patience": (EarlyStopping, "patience"), } +log = logging.getLogger(__name__) + def upgrade_checkpoint(filepath): checkpoint = torch.load(filepath) diff --git a/tests/__init__.py b/tests/__init__.py index 57feda6280c38..18f6cfcf84c79 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import numpy as np @@ -31,3 +32,6 @@ if not os.path.isdir(_TEMP_PATH): os.mkdir(_TEMP_PATH) + + +logging.basicConfig(level=logging.ERROR) diff --git a/tests/callbacks/test_early_stopping.py b/tests/callbacks/test_early_stopping.py index 9954560beed15..e7e03d1e8b63d 100644 --- a/tests/callbacks/test_early_stopping.py +++ b/tests/callbacks/test_early_stopping.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import pickle import sys @@ -21,7 +22,7 @@ import pytest import torch -from pytorch_lightning import _logger, seed_everything, Trainer +from pytorch_lightning import seed_everything, Trainer from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint from pytorch_lightning.trainer.states import TrainerState from pytorch_lightning.utilities.exceptions import MisconfigurationException @@ -29,6 +30,8 @@ from tests.helpers.datamodules import ClassifDataModule from tests.helpers.simple_models import ClassificationModel +_logger = logging.getLogger(__name__) + class EarlyStoppingTestRestore(EarlyStopping): # this class has to be defined outside the test function, otherwise we get pickle error diff --git a/tests/checkpointing/test_model_checkpoint.py b/tests/checkpointing/test_model_checkpoint.py index a6ec2bc56656f..43ed87f762abd 100644 --- a/tests/checkpointing/test_model_checkpoint.py +++ b/tests/checkpointing/test_model_checkpoint.py @@ -677,7 +677,8 @@ def test_model_checkpoint_save_last_warning(tmpdir, caplog, max_epochs, should_v callbacks=[ModelCheckpoint(monitor='early_stop_on', dirpath=tmpdir, save_top_k=0, save_last=save_last)], max_epochs=max_epochs, ) - trainer.fit(model) + with caplog.at_level(logging.INFO): + trainer.fit(model) assert caplog.messages.count('Saving latest checkpoint...') == save_last diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 701b2a4bfb900..667e153a9edd4 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import logging import os import time from pathlib import Path @@ -95,7 +95,8 @@ def test_simple_profiler_overhead(simple_profiler, n_iter=5): def test_simple_profiler_describe(caplog, simple_profiler): """Ensure the profiler won't fail when reporting the summary.""" - simple_profiler.describe() + with caplog.at_level(logging.INFO): + simple_profiler.describe() assert "Profiler Report" in caplog.text From 4d4d8829a0c9eb2a0e81fd4fb3fd0a6a97eedc5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Mon, 1 Mar 2021 20:55:42 +0100 Subject: [PATCH 02/11] isort --- pytorch_lightning/__init__.py | 1 - pytorch_lightning/loggers/neptune.py | 2 +- pytorch_lightning/trainer/connectors/slurm_connector.py | 1 - pytorch_lightning/trainer/training_tricks.py | 2 +- pytorch_lightning/tuner/batch_size_scaling.py | 1 + pytorch_lightning/utilities/distributed.py | 2 +- tests/__init__.py | 1 - 7 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pytorch_lightning/__init__.py b/pytorch_lightning/__init__.py index e52c42b8daf58..569078c994ba4 100644 --- a/pytorch_lightning/__init__.py +++ b/pytorch_lightning/__init__.py @@ -47,7 +47,6 @@ _logger.addHandler(logging.StreamHandler()) _logger.propagate = False - _PACKAGE_ROOT = os.path.dirname(__file__) _PROJECT_ROOT = os.path.dirname(_PACKAGE_ROOT) diff --git a/pytorch_lightning/loggers/neptune.py b/pytorch_lightning/loggers/neptune.py index da37d135ff7c9..a9209f1cbdd7b 100644 --- a/pytorch_lightning/loggers/neptune.py +++ b/pytorch_lightning/loggers/neptune.py @@ -15,8 +15,8 @@ Neptune Logger -------------- """ -from argparse import Namespace import logging +from argparse import Namespace from typing import Any, Dict, Iterable, Optional, Union import torch diff --git a/pytorch_lightning/trainer/connectors/slurm_connector.py b/pytorch_lightning/trainer/connectors/slurm_connector.py index 595831d49a992..f2bb00abd84bd 100644 --- a/pytorch_lightning/trainer/connectors/slurm_connector.py +++ b/pytorch_lightning/trainer/connectors/slurm_connector.py @@ -3,7 +3,6 @@ import signal from subprocess import call - log = logging.getLogger(__name__) diff --git a/pytorch_lightning/trainer/training_tricks.py b/pytorch_lightning/trainer/training_tricks.py index 9b3432c21c5f4..54731977cbee9 100644 --- a/pytorch_lightning/trainer/training_tricks.py +++ b/pytorch_lightning/trainer/training_tricks.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from abc import ABC import logging +from abc import ABC import torch from torch import Tensor diff --git a/pytorch_lightning/tuner/batch_size_scaling.py b/pytorch_lightning/tuner/batch_size_scaling.py index a237a9049986b..f86f0521971fc 100644 --- a/pytorch_lightning/tuner/batch_size_scaling.py +++ b/pytorch_lightning/tuner/batch_size_scaling.py @@ -27,6 +27,7 @@ log = logging.getLogger(__name__) + def scale_batch_size( trainer, model: LightningModule, diff --git a/pytorch_lightning/utilities/distributed.py b/pytorch_lightning/utilities/distributed.py index f6547f0b05d96..cd3e74e35a8f7 100644 --- a/pytorch_lightning/utilities/distributed.py +++ b/pytorch_lightning/utilities/distributed.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import warnings from functools import wraps @@ -19,7 +20,6 @@ import torch -import logging log = logging.getLogger(__name__) if torch.distributed.is_available(): diff --git a/tests/__init__.py b/tests/__init__.py index 18f6cfcf84c79..433f183896dee 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -33,5 +33,4 @@ if not os.path.isdir(_TEMP_PATH): os.mkdir(_TEMP_PATH) - logging.basicConfig(level=logging.ERROR) From 8d9aa2686074d382f9a31f0a169cbac7a3cc86cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Mon, 1 Mar 2021 20:57:29 +0100 Subject: [PATCH 03/11] missing import --- pytorch_lightning/trainer/trainer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pytorch_lightning/trainer/trainer.py b/pytorch_lightning/trainer/trainer.py index ff72732d134c6..7a3dde64ddf98 100644 --- a/pytorch_lightning/trainer/trainer.py +++ b/pytorch_lightning/trainer/trainer.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Trainer to automate the training.""" +import logging import warnings from itertools import count from pathlib import Path From 0eec39eeb9e8791104e334a4736d047373c7ea02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Mon, 1 Mar 2021 20:58:58 +0100 Subject: [PATCH 04/11] duplicate import --- pytorch_lightning/tuner/batch_size_scaling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pytorch_lightning/tuner/batch_size_scaling.py b/pytorch_lightning/tuner/batch_size_scaling.py index f86f0521971fc..88cea2e6b3edb 100644 --- a/pytorch_lightning/tuner/batch_size_scaling.py +++ b/pytorch_lightning/tuner/batch_size_scaling.py @@ -15,7 +15,6 @@ import os from typing import Optional, Tuple -from pytorch_lightning import _logger as log from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.loggers.base import DummyLogger from pytorch_lightning.utilities import DeviceType, rank_zero_warn @@ -25,6 +24,7 @@ from pytorch_lightning.utilities.memory import garbage_collection_cuda, is_oom_error from pytorch_lightning.utilities.parsing import lightning_getattr, lightning_hasattr, lightning_setattr + log = logging.getLogger(__name__) From ef46309b3e07d30cc456b4fde309117b2030b03a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Mon, 1 Mar 2021 20:59:29 +0100 Subject: [PATCH 05/11] remove unused plugins --- pytorch_lightning/plugins/ddp_plugin.py | 170 -------- .../plugins/ddp_sequential_plugin.py | 408 ------------------ 2 files changed, 578 deletions(-) delete mode 100644 pytorch_lightning/plugins/ddp_plugin.py delete mode 100644 pytorch_lightning/plugins/ddp_sequential_plugin.py diff --git a/pytorch_lightning/plugins/ddp_plugin.py b/pytorch_lightning/plugins/ddp_plugin.py deleted file mode 100644 index 6d5ad1e9e2119..0000000000000 --- a/pytorch_lightning/plugins/ddp_plugin.py +++ /dev/null @@ -1,170 +0,0 @@ -import os -from contextlib import contextmanager -from typing import Any, Dict, List, Union - -import torch -import torch.distributed as torch_distrib -from torch.optim import Optimizer - -from pytorch_lightning import _logger as log -from pytorch_lightning.core.lightning import LightningModule -from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel -from pytorch_lightning.plugins.plugin import LightningPlugin - - -class DDPPlugin(LightningPlugin): - """ - Plugin to link a custom ddp implementation to any arbitrary accelerator. - - This plugin forwards all constructor arguments to `LightningDistributedDataParallel`, - which in turn forwards all args to `DistributedDataParallel`. - - Example:: - - class MyDDP(DDPPlugin): - - def configure_ddp(self, model, device_ids): - model = MyDDPWrapper(model, device_ids) - return model - - my_ddp = MyDDP() - trainer = Trainer(accelerator='ddp_x', plugins=[my_ddp]) - """ - - def __init__(self, **kwargs): - self._ddp_kwargs: Dict[str, Any] = kwargs - - def configure_ddp( - self, model: LightningModule, device_ids: List[int] - ) -> LightningDistributedDataParallel: - """ - Pass through all customizations from constructor to `LightningDistributedDataParallel`. - Override to define a custom DDP implementation. - - .. note:: Only requirement is that your DDP implementation subclasses LightningDistributedDataParallel - - - The default implementation is:: - - def configure_ddp(self, model, device_ids): - model = LightningDistributedDataParallel( - model, device_ids=device_ids, find_unused_parameters=False - ) - return model - - Args: - model: the lightningModule - device_ids: the list of devices available - - Returns: - the model wrapped in LightningDistributedDataParallel - - """ - # if unset, default `find_unused_parameters` `False` - self._ddp_kwargs["find_unused_parameters"] = self._ddp_kwargs.get( - "find_unused_parameters", False - ) - model = LightningDistributedDataParallel( - model, - device_ids=device_ids, - **self._ddp_kwargs, - ) - return model - - def init_ddp_connection( - self, - trainer, - cluster_environment, - global_rank: int, - world_size: int, - is_slurm_managing_tasks: bool = True, - ) -> None: - os.environ["MASTER_ADDR"] = str(cluster_environment.master_address()) - os.environ["MASTER_PORT"] = str(cluster_environment.master_port()) - os.environ["WORLD_SIZE"] = str(cluster_environment.world_size()) - torch_backend = "nccl" if trainer.on_gpu else "gloo" - - if not torch_distrib.is_initialized(): - log.info( - f"initializing ddp: GLOBAL_RANK: {global_rank}, MEMBER: {global_rank + 1}/{world_size}" - ) - torch_distrib.init_process_group( - torch_backend, rank=global_rank, world_size=world_size - ) - - @property - def is_running_single_process_per_device(self) -> bool: - # objects do not need to be scattered in single process per device, move objects upfront to device - # This property is used in ``self.on_before_forward`` function. - return self.device_ids is not None and len(self.device_ids) == 1 - - def on_before_forward(self, model: LightningModule, *args): - """ - Override to handle custom edge case. - - Args: - args: Inputs to the model. - model: Model to train. - Returns: args moved to correct device if needed. - """ - if self.is_running_single_process_per_device: - args = model.transfer_batch_to_device(args, model.device) - return args - - def optimizer_state(self, optimizer: Optimizer) -> dict: - return optimizer.state_dict() - - def on_after_setup_optimizers(self, trainer): - """ - Called after optimizers have been set-up. This is useful for doing any configuration options in RPC, or - state sharding. - """ - - def get_model_from_plugin( - self, - model: Union[LightningDistributedDataParallel, LightningModule] - ) -> LightningModule: - """ - Override to modify returning base :class:`LightningModule` - when accessing variable and functions outside of the parallel wrapper. - - Example:: - ref_model = ddp_plugin.get_model_from_plugin(model) - ref_model.training_step(...) - - Args: - model: Model with parallel wrapper. - - Returns: Reference :class:`LightningModule` within parallel wrapper. - - """ - if isinstance(model, LightningDistributedDataParallel): - return model.module - return model - - @contextmanager - def block_backward_sync(self, model: LightningDistributedDataParallel): - """ - Blocks ddp sync gradients behaviour on backwards pass. - This is useful for skipping sync when accumulating gradients, reducing communication overhead - Returns: context manager with sync behaviour off - """ - yield model.no_sync() - - def on_before_manual_backward(self, model: LightningDistributedDataParallel, output: Any): - model.reducer_prepare_for_backwards(output) - - def on_after_manual_backward(self, model: LightningDistributedDataParallel): - model.reducer_reset_hooks() - - def distributed_sampler_kwargs(self, distributed_sampler_kwargs): - return distributed_sampler_kwargs - - @property - def data_parallel_group(self): - """ - Return the group that this process exists in. By default, this is the world size. - Useful for when additional parallel groups have been created, to select certain processes. - Returns: The ProcessGroup this process exists in. - """ - return torch_distrib.group.WORLD diff --git a/pytorch_lightning/plugins/ddp_sequential_plugin.py b/pytorch_lightning/plugins/ddp_sequential_plugin.py deleted file mode 100644 index 069b1754fbce0..0000000000000 --- a/pytorch_lightning/plugins/ddp_sequential_plugin.py +++ /dev/null @@ -1,408 +0,0 @@ -# Copyright The PyTorch Lightning team. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License -import os -from typing import Any, List, Optional - -import torch -from torch import nn -import torch.distributed as torch_distrib -from torch.nn.parallel import DistributedDataParallel - -from pytorch_lightning import _logger as log -from pytorch_lightning import LightningModule -from pytorch_lightning.overrides.data_parallel import LightningDistributedDataParallel -from pytorch_lightning.plugins.rpc_plugin import RPCPlugin -from pytorch_lightning.utilities import FAIRSCALE_PIPE_AVAILABLE, rank_zero_only -from pytorch_lightning.utilities.exceptions import MisconfigurationException - -if FAIRSCALE_PIPE_AVAILABLE: - from fairscale.nn import PipeRPCWrapper - import fairscale.nn.model_parallel as mpu - from fairscale.nn.pipe import balance as pipe_balance - from fairscale.nn.pipe import rpc as rpc_pipe - from fairscale.nn.pipe.pipeline import PipelineStyle - - -class DDPSequentialPlugin(RPCPlugin): - def __init__( - self, - balance: Optional[List[int]] = None, - microbatches: int = 8, - checkpoint: str = 'except_last', - balance_mode: str = "balance_by_size", - pipelined_backward: Optional[bool] = True, - **kwargs): - """ - Provides sequential model parallelism for :class:`nn.Sequential ` module. - If the module requires lots of memory, Pipe can be used to reduce this by leveraging multiple GPUs. - - Example:: - class MyLightningModule: - def __init__(self): - ... - model.sequential_module = torch.nn.Sequential(my_layers) - - # Split my module across 4 gpus, one layer each - model = MyLightningModule() - plugin = DDPSequentialPlugin(balance=[1, 1, 1, 1]) - trainer = Trainer(accelerator='ddp', gpus=4, plugins=[plugin]) - trainer.fit(model) - - .. _DDPSequentialPlugin: https://arxiv.org/abs/1811.06965 - - Pipeline parallelism comes with with checkpointing to reduce peak - memory required to train while minimizing device under-utilization. - This is turned on by default and can be turned off via the checkpoint argument. - - You should determine the balance when defining the plugin, - or you can pass an example input array via the LightningModule to infer a balance. - The module will be partitioned into multiple devices according to the given balance. You may also rely on - your own heuristics to find your own optimal configuration. - - Args: - balance: The balance of the model, i.e [2, 2] (two layers on each GPU). - If not provided assumes user provides an input example array to find a balance on all GPUs. - - microbatches: Allows for parallelization to reduce device utilization - by splitting the batch into further smaller batches. - - checkpoint: Enables gradient checkpointing. ['always', 'except_last', 'never'] - - balance_mode: Type of balance heuristic to use if balance to be inferred. - - - 'balance_by_size': checks memory usage of each layer and determines balance - - - 'balance_by_time': checks time of each layer and determines balance - - pipelined_backward: if True, call torch.autograd.backward once per microbatch on the - - backward pass (instead of once for the whole batch). This works - around a potential deadlock in pytorch when using tensor parallelism - at the same time. Defaults to `True` if - `get_model_parallel_world_size() > 1` - """ - self._check_pipe_available() - super().__init__(**kwargs) - - self.balance = balance - - self.microbatches = microbatches - self.checkpoint = checkpoint - self.balance_mode = balance_mode - self.pipelined_backward = pipelined_backward - self.main_rpc_process = False # Updated by main process, default for all secondary processes - - def init_ddp_connection( - self, - trainer, - cluster_environment, - global_rank: int, - world_size: int, - is_slurm_managing_tasks: bool = True, - ) -> None: - trainer.prepared_for_backwards = False - self._check_arguments(trainer) - if self._skip_init_connections(trainer): - return - super().init_ddp_connection( - trainer=trainer, - cluster_environment=cluster_environment, - global_rank=global_rank, - world_size=world_size, - is_slurm_managing_tasks=is_slurm_managing_tasks - ) - super().init_rpc_connection( - global_rank=global_rank, - world_size=world_size - ) - model = trainer.get_model() - self.gpus_per_model = self._infer_check_num_gpus(trainer) - self.init_model_parallel_groups(trainer) - self.set_main_rpc_process() - - self._check_sequential_model_exists(model) - if self.main_rpc_process: - if self.balance is None: - self._infer_model_balance(trainer) - self._assert_valid_model_balance(trainer) - - def on_before_manual_backward(self, model: LightningDistributedDataParallel, output: Any): - pass - - def _infer_model_balance(self, trainer): - log.info(f'Inferring model balance using {self.balance_mode} mode') - model = trainer.get_model() - if model.example_input_array is None: - raise MisconfigurationException( - 'Please set example_input_array to your model, so we can infer the right model balance for you') - balance_func = getattr(pipe_balance, self.balance_mode) - self.balance = balance_func(self.gpus_per_model, model.sequential_module, model.example_input_array) - self._sync_balance_to_all_parallel_groups() - - log.info(f'The following model balance {self.balance.tolist()} was inferred using {self.balance_mode} mode') - - def _sync_balance_to_all_parallel_groups(self, main_rank=0): - """ - Ensures that we sync the balance to all main processes, so that the balance is the same per replica. - Args: - main_rank: The rank with the balance we'd like to replicate. - """ - self.balance = torch.tensor(self.balance, dtype=torch.int, device='cuda') - # Ensure we sync to all processes within the main data parallel group - # We use the data parallel group as all main processes are found within the same group - torch_distrib.broadcast(self.balance, src=main_rank, group=mpu.get_data_parallel_group()) - self.balance = self.balance.cpu() - - def _check_sequential_model_exists(self, model): - if not hasattr(model, "sequential_module") or not isinstance(model.sequential_module, nn.Sequential): - raise MisconfigurationException( - 'Could not find a PipeLightningModule within the model. ' - 'Did you set your sequential model as the `sequential_module` attribute of your model?') - - def _find_and_init_pipe_module(self, model): - if hasattr(model, "sequential_module") and isinstance(model.sequential_module, LightningPipeModule): - # model has been wrapped already - return - elif hasattr(model, "sequential_module") and isinstance(model.sequential_module, nn.Sequential): - # try to wrap model for the user - model.sequential_module = LightningPipeModule( - model.sequential_module, - balance=self.balance, - microbatches=self.microbatches, - checkpoint=self.checkpoint, - ) - # Update references for workers to access correct lightning functions when calling RPC - model.sequential_module.trainer = model.trainer - model.sequential_module.configure_optimizers = model.configure_optimizers - - # Update references for main process to access correct lightning functions when calling RPC - model.sequential_module.module.model.trainer = model.trainer - model.sequential_module.module.model.configure_optimizers = model.configure_optimizers - - else: - raise MisconfigurationException( - 'Could not find a PipeLightningModule within the model. ' - 'Did you defined set your sequential model as an `sequential_module` attribute of your model ?' - ) - - def _assert_valid_model_balance(self, trainer): - model = trainer.get_model() - if sum(self.balance) != len(model.sequential_module): - raise MisconfigurationException( - f'The provided balance sum: {sum(self.balance)} does not' - f' match your Sequential length: {len(model.sequential_module)}') - - def _skip_init_connections(self, trainer): - """ - Skip initialization if torch is already initialized and we're in testing. - Returns: Whether to skip initialization - - """ - return torch_distrib.is_initialized() and trainer.testing - - def init_model_parallel_groups(self, trainer): - num_model_parallel = 1 # TODO currently no support for vertical model parallel - mpu.initialize_model_parallel( - model_parallel_size_=num_model_parallel, - pipeline_length=self.gpus_per_model - ) - - def _infer_check_num_gpus(self, trainer): - """ - Infer the number of GPUs per model. - - Args: - trainer: The trainer object. - - Returns: The appropriate balance for the model - """ - if isinstance(self.balance, list): - if len(self.balance) != (trainer.world_size / trainer.num_nodes): - raise MisconfigurationException( - "Pipe currently only supports splitting the module onto all available GPUs" - ) - # User has defined a balance for his model - return len(self.balance) - # Assume that the user wants to balance his model on all GPUs - return trainer.world_size - - def on_accelerator_exit_rpc_process(self, trainer) -> None: - if not trainer.testing: - torch_distrib.barrier() # Ensure we await main process initialization - - # Add trainer/configure_optimizers to the pipe model for access in all worker processes - rpc_pipe.PipeModel.trainer = trainer - del rpc_pipe.PipeModel.trainer.model.sequential_module - rpc_pipe.PipeModel.trainer.model.sequential_module = rpc_pipe.PipeModel - rpc_pipe.PipeModel.configure_optimizers = trainer.model.configure_optimizers - super().on_accelerator_exit_rpc_process(trainer) - - def set_main_rpc_process(self): - self.main_rpc_process = torch_distrib.get_rank(group=mpu.get_pipeline_parallel_group()) == 0 - - def on_main_rpc_connection(self, trainer) -> None: - # Create pipe_module - model = trainer.get_model() - self._find_and_init_pipe_module(model) - if not trainer.testing: - torch_distrib.barrier() # Ensure we join main process initialization - model.sequential_module.foreach_worker(register_optimizers, include_self=True) - - def _check_arguments(self, trainer): - if trainer.amp_backend is not None: - raise MisconfigurationException( - 'DDPSequentialPlugin is currently not supported in Automatic Mixed Precision') - - def configure_ddp( - self, - model: LightningModule, device_ids: List[int]) -> DistributedDataParallel: - ddp_plugin = RPCPlugin(process_group=mpu.get_data_parallel_group()).configure_ddp(model, device_ids) - # Plugin handle backwards across processes. Currently not supported for DDP + pipe parallel - ddp_plugin.PREPARE_FOR_BACKWARDS = False - return ddp_plugin - - @rank_zero_only - def rpc_save_model( - self, - save_model_fn, - last_filepath, - trainer, - pl_module) -> None: - model = trainer.get_model() - if not hasattr(model.sequential_module, "foreach_worker"): - return - current_layers = pl_module.sequential_module - model.sequential_module.foreach_worker( - save_layers_on_all_rank_zero_workers, - {"gpus_per_model": self.gpus_per_model}, - include_self=True - ) - pl_module.sequential_module = load_sequential_from_saved_layers(self.gpus_per_model) - save_model_fn(last_filepath, trainer, pl_module) - pl_module.sequential_module = current_layers - - def worker_optimizer_step( - self, - model: LightningModule, - opt_idx: int, - *args, - **kwargs) -> None: - model.sequential_module.foreach_worker( - run_optimizer, - {"opt_idx": opt_idx, "args": args, "kwargs": kwargs}, - include_self=False - ) - - def distributed_sampler_kwargs(self, distributed_sampler_kwargs): - return dict( - num_replicas=mpu.get_data_parallel_world_size(), - rank=mpu.get_data_parallel_rank(), - ) - - @property - def data_parallel_group(self): - return mpu.get_data_parallel_group() - - @property - def is_main_rpc_process(self) -> bool: - return self.main_rpc_process - - @property - def return_after_exit_rpc_process(self) -> bool: - return True - - def barrier(self, name: Optional[str] = None) -> None: - if torch_distrib.is_initialized() and self.is_main_rpc_process: - torch_distrib.barrier(group=self.data_parallel_group) - - def _check_pipe_available(self): - if not FAIRSCALE_PIPE_AVAILABLE: - raise MisconfigurationException( - 'PipeRPCPlugin requires FairScale and currently is only supported on PyTorch 1.6.' - ) - - -class LightningPipeModule(nn.Module): - """ - This class wraps Fairscale Pipe and PipeRCPWrapper class. - """ - - def __init__( - self, - module: nn.Sequential, - balance: List[int], - microbatches: int = 8, - checkpoint='never'): - super().__init__() - self.module = module - self.balance = balance - self.microbatches = microbatches - self.checkpoint = checkpoint - self._init_pipe() - - def _init_pipe(self): - device = torch.device("cuda", torch_distrib.get_rank()) - - self.module = PipeRPCWrapper( - module=self.module, - balance=self.balance, - chunks=self.microbatches, - style=PipelineStyle.MultiProcess, - input_device=device, - worker_map=self.get_worker_map(), - checkpoint=self.checkpoint, - ) - - def foreach_worker(self, *args, **kwargs): - self.module.foreach_worker(*args, **kwargs) - - def forward(self, *args, **kwargs): - return self.module(*args, **kwargs) - - def get_worker_map(self): - # TODO, is this correct with multinodes? We also assume "worker" is the same as defined in the RPCPlugin - return {rank: f"worker{rank}" for rank in range(torch_distrib.get_world_size())} - - -def register_optimizers(ctx, model): - optimizers, lr_schedulers, optimizer_frequencies = model.trainer.init_optimizers(model) - model.trainer.optimizers = optimizers - model.trainer.lr_schedulers = lr_schedulers - model.trainer.optimizer_frequencies = optimizer_frequencies - - -def run_optimizer(ctx, model): - trainer = model.trainer - opt_idx = ctx["opt_idx"] - optimizer = trainer.optimizers[opt_idx] - optimizer.step(*ctx["args"], **ctx["kwargs"]) - - -def save_layers_on_all_rank_zero_workers(ctx, model): - gpus_per_model = ctx["gpus_per_model"] - rank = torch_distrib.get_rank() - if rank in range(gpus_per_model): - seq = list(model.children())[0] - torch.save(seq, f"seq_{rank}.pt") - - -def load_sequential_from_saved_layers(gpus_per_model): - partial_seqs = [torch.load(f"seq_{rank}.pt", map_location='cpu') for rank in range(gpus_per_model)] - seq = nn.Sequential() - for p_seq in partial_seqs: - for name, child in p_seq.named_children(): - seq.add_module(name, child) - # delete tmp files - [os.remove(f"seq_{rank}.pt") for rank in range(gpus_per_model)] - return seq From 9f55b58562db7f36b5d8bfc393a12cea3fdc8068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Mon, 1 Mar 2021 21:05:39 +0100 Subject: [PATCH 06/11] move changelog --- CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35adc1b04aa47..150136833ceec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Prevent `WandbLogger` from dropping values ([#5931](https://github.com/PyTorchLightning/pytorch-lightning/pull/5931)) +- Fixed duplicate logs appearing in console when using the python logging module ([#5509](https://github.com/PyTorchLightning/pytorch-lightning/pull/5509), [#6275](https://github.com/PyTorchLightning/pytorch-lightning/pull/6275)) + + ## [1.2.1] - 2021-02-23 ### Fixed @@ -289,9 +292,6 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Fixed `val_check_interval` with `fast_dev_run` ([#5540](https://github.com/PyTorchLightning/pytorch-lightning/pull/5540)) -- Fixed duplicate logs appearing in console when using the python logging module ([#5509](https://github.com/PyTorchLightning/pytorch-lightning/pull/5509)) - - ## [1.1.4] - 2021-01-12 ### Added From d59e74fb83f288912b9883e23846c96e30206526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Mon, 1 Mar 2021 21:09:21 +0100 Subject: [PATCH 07/11] add missing import --- tests/checkpointing/test_model_checkpoint.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/checkpointing/test_model_checkpoint.py b/tests/checkpointing/test_model_checkpoint.py index 43ed87f762abd..c993c9c73872c 100644 --- a/tests/checkpointing/test_model_checkpoint.py +++ b/tests/checkpointing/test_model_checkpoint.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import math import os import pickle From 9b6edd1a1f96f1f5598c37fa223a8344f919e4d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Mon, 1 Mar 2021 22:12:34 +0100 Subject: [PATCH 08/11] missing changes --- pytorch_lightning/callbacks/finetuning.py | 4 +++- pytorch_lightning/callbacks/pruning.py | 4 +++- pytorch_lightning/plugins/environments/slurm_environment.py | 4 +++- .../plugins/environments/torchelastic_environment.py | 4 +++- pytorch_lightning/plugins/training_type/ddp.py | 5 ++++- pytorch_lightning/plugins/training_type/ddp_spawn.py | 4 +++- .../trainer/connectors/accelerator_connector.py | 4 +++- pytorch_lightning/tuner/batch_size_scaling.py | 1 - 8 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pytorch_lightning/callbacks/finetuning.py b/pytorch_lightning/callbacks/finetuning.py index 9f2697a9f9635..b25e5e06e8b86 100644 --- a/pytorch_lightning/callbacks/finetuning.py +++ b/pytorch_lightning/callbacks/finetuning.py @@ -16,6 +16,7 @@ ^^^^^^^^^^^^^^^^^^^^ Freeze and unfreeze models for finetuning purposes """ +import logging from typing import Callable, Generator, Iterable, List, Optional, Union import torch @@ -24,12 +25,13 @@ from torch.nn.modules.container import Container, ModuleDict, ModuleList, Sequential from torch.optim.optimizer import Optimizer -from pytorch_lightning import _logger as log from pytorch_lightning.callbacks.base import Callback from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.utilities import rank_zero_warn from pytorch_lightning.utilities.exceptions import MisconfigurationException +log = logging.getLogger(__name__) + def multiplicative(epoch): return 2 diff --git a/pytorch_lightning/callbacks/pruning.py b/pytorch_lightning/callbacks/pruning.py index ee130a700ae68..1e7a13db0a18e 100644 --- a/pytorch_lightning/callbacks/pruning.py +++ b/pytorch_lightning/callbacks/pruning.py @@ -16,6 +16,7 @@ ^^^^^^^^^^^^ """ import inspect +import logging from copy import deepcopy from functools import partial from typing import Any, Callable, List, Optional, Tuple, Union @@ -24,12 +25,13 @@ import torch.nn.utils.prune as pytorch_prune from torch import nn -from pytorch_lightning import _logger as log from pytorch_lightning.callbacks.base import Callback from pytorch_lightning.core.lightning import LightningModule from pytorch_lightning.utilities import rank_zero_only from pytorch_lightning.utilities.exceptions import MisconfigurationException +log = logging.getLogger(__name__) + _PYTORCH_PRUNING_FUNCTIONS = { "ln_structured": pytorch_prune.ln_structured, "l1_unstructured": pytorch_prune.l1_unstructured, diff --git a/pytorch_lightning/plugins/environments/slurm_environment.py b/pytorch_lightning/plugins/environments/slurm_environment.py index 59ab27cd4c323..7f9586cab0ace 100644 --- a/pytorch_lightning/plugins/environments/slurm_environment.py +++ b/pytorch_lightning/plugins/environments/slurm_environment.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import re -from pytorch_lightning import _logger as log from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment +log = logging.getLogger(__name__) + class SLURMEnvironment(ClusterEnvironment): diff --git a/pytorch_lightning/plugins/environments/torchelastic_environment.py b/pytorch_lightning/plugins/environments/torchelastic_environment.py index bb77760e9dd61..5ac7d9f1c9a40 100644 --- a/pytorch_lightning/plugins/environments/torchelastic_environment.py +++ b/pytorch_lightning/plugins/environments/torchelastic_environment.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os -from pytorch_lightning import _logger as log from pytorch_lightning.plugins.environments.cluster_environment import ClusterEnvironment from pytorch_lightning.utilities import rank_zero_warn +log = logging.getLogger(__name__) + class TorchElasticEnvironment(ClusterEnvironment): diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index 80161d6e59b6b..748dcdc9e6b68 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import subprocess import sys @@ -23,7 +24,6 @@ from torch.nn.parallel.distributed import DistributedDataParallel from torch.optim import Optimizer -from pytorch_lightning import _logger as log from pytorch_lightning.distributed import LightningDistributed from pytorch_lightning.overrides import LightningDistributedModule from pytorch_lightning.overrides.distributed import prepare_for_backward @@ -44,6 +44,9 @@ from hydra.utils import get_original_cwd, to_absolute_path +log = logging.getLogger(__name__) + + class DDPPlugin(ParallelPlugin): """ Plugin for multi-process single-device training on one or multiple nodes. diff --git a/pytorch_lightning/plugins/training_type/ddp_spawn.py b/pytorch_lightning/plugins/training_type/ddp_spawn.py index ca25a6d8bc382..9ff4bb8cd2749 100644 --- a/pytorch_lightning/plugins/training_type/ddp_spawn.py +++ b/pytorch_lightning/plugins/training_type/ddp_spawn.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import re from typing import Any, Dict, List, Optional, Union @@ -21,7 +22,6 @@ from torch.nn.parallel.distributed import DistributedDataParallel from torch.optim import Optimizer -from pytorch_lightning import _logger as log from pytorch_lightning.distributed.dist import LightningDistributed from pytorch_lightning.overrides import LightningDistributedModule from pytorch_lightning.overrides.distributed import prepare_for_backward @@ -39,6 +39,8 @@ ) from pytorch_lightning.utilities.seed import seed_everything +log = logging.getLogger(__name__) + class DDPSpawnPlugin(ParallelPlugin): diff --git a/pytorch_lightning/trainer/connectors/accelerator_connector.py b/pytorch_lightning/trainer/connectors/accelerator_connector.py index 4f942f9b35e5d..30bfbe2d963db 100644 --- a/pytorch_lightning/trainer/connectors/accelerator_connector.py +++ b/pytorch_lightning/trainer/connectors/accelerator_connector.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os from typing import List, Optional, Sequence, Union import torch -from pytorch_lightning import _logger as log from pytorch_lightning.accelerators.accelerator import Accelerator from pytorch_lightning.accelerators.cpu import CPUAccelerator from pytorch_lightning.accelerators.gpu import GPUAccelerator @@ -61,6 +61,8 @@ if _HOROVOD_AVAILABLE: import horovod.torch as hvd +log = logging.getLogger(__name__) + class AcceleratorConnector(object): diff --git a/pytorch_lightning/tuner/batch_size_scaling.py b/pytorch_lightning/tuner/batch_size_scaling.py index 88cea2e6b3edb..a07de29324b24 100644 --- a/pytorch_lightning/tuner/batch_size_scaling.py +++ b/pytorch_lightning/tuner/batch_size_scaling.py @@ -24,7 +24,6 @@ from pytorch_lightning.utilities.memory import garbage_collection_cuda, is_oom_error from pytorch_lightning.utilities.parsing import lightning_getattr, lightning_hasattr, lightning_setattr - log = logging.getLogger(__name__) From 777f0d21d0f6e67d708fa1aca8b755f814bf2713 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Tue, 2 Mar 2021 01:13:56 +0100 Subject: [PATCH 09/11] Update docs/source/extensions/logging.rst Co-authored-by: Jirka Borovec --- docs/source/extensions/logging.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/extensions/logging.rst b/docs/source/extensions/logging.rst index 92304255ea901..bfeed22fd4e66 100644 --- a/docs/source/extensions/logging.rst +++ b/docs/source/extensions/logging.rst @@ -262,7 +262,7 @@ Lightning logs useful information about the training process and user warnings t You can retrieve the Lightning logger and change it to your liking. For example, adjust the logging level or redirect output for certain modules to log files: -.. code-block:: python +.. testcode:: import logging From 20257acc4583bcdd9dcd4d2931549230752fea78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Tue, 2 Mar 2021 10:16:09 +0100 Subject: [PATCH 10/11] missing rename --- pytorch_lightning/profiler/pytorch.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pytorch_lightning/profiler/pytorch.py b/pytorch_lightning/profiler/pytorch.py index 596b658e81f88..4ccb88883260e 100644 --- a/pytorch_lightning/profiler/pytorch.py +++ b/pytorch_lightning/profiler/pytorch.py @@ -14,18 +14,20 @@ """Profiler to check if there are any bottlenecks in your code.""" import inspect +import logging import os from typing import List, Optional import torch -from pytorch_lightning import _logger as log from pytorch_lightning.profiler.profilers import BaseProfiler from pytorch_lightning.utilities import rank_zero_only from pytorch_lightning.utilities.cloud_io import get_filesystem from pytorch_lightning.utilities.distributed import rank_zero_warn from pytorch_lightning.utilities.exceptions import MisconfigurationException +log = logging.getLogger(__name__) + class PyTorchProfiler(BaseProfiler): From 29cc51b7490861615b13529d73e761aaebfb1bed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrian=20W=C3=A4lchli?= Date: Tue, 2 Mar 2021 10:18:12 +0100 Subject: [PATCH 11/11] unused import --- pytorch_lightning/profiler/profilers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pytorch_lightning/profiler/profilers.py b/pytorch_lightning/profiler/profilers.py index cf71ff7c3ae2e..75bcda09e1af7 100644 --- a/pytorch_lightning/profiler/profilers.py +++ b/pytorch_lightning/profiler/profilers.py @@ -26,7 +26,6 @@ import numpy as np -from pytorch_lightning.utilities import rank_zero_only from pytorch_lightning.utilities.cloud_io import get_filesystem log = logging.getLogger(__name__)