From 7608f4071fb948a2154e8f7ef7127fabc8f88520 Mon Sep 17 00:00:00 2001 From: Anton Schwaighofer Date: Thu, 9 Dec 2021 12:36:56 +0000 Subject: [PATCH 1/8] diagnostics --- InnerEye/ML/utils/checkpoint_handling.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/InnerEye/ML/utils/checkpoint_handling.py b/InnerEye/ML/utils/checkpoint_handling.py index a445743fe..c004a0763 100644 --- a/InnerEye/ML/utils/checkpoint_handling.py +++ b/InnerEye/ML/utils/checkpoint_handling.py @@ -12,6 +12,7 @@ import requests from azureml.core import Run, Workspace, Model +from health_azure.utils import is_global_rank_zero from InnerEye.Azure.azure_config import AzureConfig from InnerEye.Common.fixed_paths import MODEL_INFERENCE_JSON_FILE_NAME @@ -91,10 +92,11 @@ def get_recovery_or_checkpoint_path_train(self) -> Optional[Path]: the latest checkpoint will be present in this folder too. :return: Constructed checkpoint path to recover from. """ - checkpoints = list(self.container.checkpoint_folder.rglob("*")) - logging.debug(f"Available checkpoints: {len(checkpoints)}") - for f in checkpoints: - logging.debug(f) + if is_global_rank_zero(): + checkpoints = list(self.container.checkpoint_folder.rglob("*")) + logging.info(f"Available checkpoints: {len(checkpoints)}") + for f in checkpoints: + logging.info(f) recovery = find_recovery_checkpoint_and_epoch(self.container.checkpoint_folder) if recovery is not None: local_recovery_path, recovery_epoch = recovery From 40ae4ad4e024142ee1f043ccca19c99fea73e1db Mon Sep 17 00:00:00 2001 From: Anton Schwaighofer Date: Thu, 9 Dec 2021 12:38:27 +0000 Subject: [PATCH 2/8] code --- InnerEye/ML/common.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/InnerEye/ML/common.py b/InnerEye/ML/common.py index cafe8970b..030b93de0 100644 --- a/InnerEye/ML/common.py +++ b/InnerEye/ML/common.py @@ -11,6 +11,15 @@ from typing import Any, Dict, List, Optional, Tuple import numpy as np +import tempfile + +from azureml.core import Run + +from InnerEye.Azure.azure_util import RUN_CONTEXT +from InnerEye.Common.fixed_paths import DEFAULT_AML_UPLOAD_DIR +from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER +from health_azure import download_files_from_run_id, is_running_in_azure_ml +from health_azure.utils import get_run_file_names DATASET_CSV_FILE_NAME = "dataset.csv" CHECKPOINT_SUFFIX = ".ckpt" @@ -122,11 +131,40 @@ def find_recovery_checkpoint_and_epoch(path: Path) -> Optional[PathAndEpoch]: recovery checkpoint path and recovery epoch. """ available_checkpoints = find_all_recovery_checkpoints(path) + if available_checkpoints is None and is_running_in_azure_ml(): + logging.info("No recovery checkpoints available in the checkpoint folder. Trying to find checkpoints in " + "AzureML from previous runs of this job.") + # Download checkpoints from AzureML, then try to find recovery checkpoints among those. + temp_folder = download_checkpoints_to_temp_folder(RUN_CONTEXT) + available_checkpoints = find_all_recovery_checkpoints(temp_folder) if available_checkpoints is not None: return extract_latest_checkpoint_and_epoch(available_checkpoints) return None +def download_checkpoints_to_temp_folder(run: Run) -> Path: + """ + Downloads all files with the outputs/checkpoints prefix of the given run to a temporary folder. + In distributed training, the download only happens once per node. + + :return: The path to which the files were downloaded. + """ + # Downloads should go to a temporary folder because downloading the files to the checkpoint folder might + # cause artifact conflicts later. + temp_folder = Path(tempfile.mkdtemp()) + checkpoint_prefix = f"{DEFAULT_AML_UPLOAD_DIR}/{CHECKPOINT_FOLDER}/" + existing_checkpoints = get_run_file_names(run, prefix=checkpoint_prefix) + logging.info(f"Number of checkpoints available in AzureML: {len(existing_checkpoints)}") + if len(existing_checkpoints) > 0: + try: + download_files_from_run_id(run_id=run.id, + output_folder=temp_folder, + prefix=checkpoint_prefix) + except Exception as ex: + logging.warning(f"Unable to download checkpoints from AzureML. Error: {str(ex)}") + return temp_folder + + def create_best_checkpoint(path: Path) -> Path: """ Creates the best checkpoint file. "Best" is at the moment defined as being the last checkpoint, but could be From fe9ac222329893ffbd7d9939cebbf91c1b86f39b Mon Sep 17 00:00:00 2001 From: Anton Schwaighofer Date: Thu, 9 Dec 2021 14:49:58 +0000 Subject: [PATCH 3/8] resolve circular imports --- InnerEye/ML/common.py | 136 +----------- InnerEye/ML/deep_learning_config.py | 27 ++- InnerEye/ML/lightning_container.py | 4 +- InnerEye/ML/model_training.py | 4 +- .../ML/normalize_and_visualize_dataset.py | 3 +- InnerEye/ML/run_ml.py | 19 +- InnerEye/ML/utils/checkpoint_handling.py | 197 +++++++++++++++++- InnerEye/ML/utils/run_recovery.py | 77 +------ InnerEye/Scripts/move_model.py | 2 +- Tests/AfterTraining/test_after_training.py | 34 ++- Tests/ML/test_download_upload.py | 6 +- Tests/ML/test_lightning_containers.py | 4 +- Tests/ML/test_regression_tests.py | 2 +- Tests/ML/utils/test_checkpoint_handling.py | 5 +- Tests/ML/utils/test_model_util.py | 6 +- TestsOutsidePackage/test_register_model.py | 2 +- 16 files changed, 272 insertions(+), 256 deletions(-) diff --git a/InnerEye/ML/common.py b/InnerEye/ML/common.py index 030b93de0..f12aaa5f9 100644 --- a/InnerEye/ML/common.py +++ b/InnerEye/ML/common.py @@ -2,24 +2,12 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. # ------------------------------------------------------------------------------------------ +from __future__ import annotations + import abc -import logging -import re from datetime import datetime from enum import Enum, unique -from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple - -import numpy as np -import tempfile - -from azureml.core import Run - -from InnerEye.Azure.azure_util import RUN_CONTEXT -from InnerEye.Common.fixed_paths import DEFAULT_AML_UPLOAD_DIR -from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER -from health_azure import download_files_from_run_id, is_running_in_azure_ml -from health_azure.utils import get_run_file_names +from typing import Any, Dict, List DATASET_CSV_FILE_NAME = "dataset.csv" CHECKPOINT_SUFFIX = ".ckpt" @@ -35,6 +23,13 @@ LAST_CHECKPOINT_FILE_NAME = "last" LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX = LAST_CHECKPOINT_FILE_NAME + CHECKPOINT_SUFFIX +FINAL_MODEL_FOLDER = "final_model" +FINAL_ENSEMBLE_MODEL_FOLDER = "final_ensemble_model" +CHECKPOINT_FOLDER = "checkpoints" +VISUALIZATION_FOLDER = "visualizations" +EXTRA_RUN_SUBFOLDER = "extra_run_id" +ARGS_TXT = "args.txt" + @unique class ModelExecutionMode(Enum): @@ -73,117 +68,6 @@ def get_feature_length(self, column: str) -> int: raise NotImplementedError("get_feature_length must be implemented by sub classes") -def get_recovery_checkpoint_path(path: Path) -> Path: - """ - Returns the path to the last recovery checkpoint in the given folder or the provided filename. Raises a - FileNotFoundError if no - recovery checkpoint file is present. - :param path: Path to checkpoint folder - """ - recovery_ckpt_and_epoch = find_recovery_checkpoint_and_epoch(path) - if recovery_ckpt_and_epoch is not None: - return recovery_ckpt_and_epoch[0] - files = list(path.glob("*")) - raise FileNotFoundError(f"No checkpoint files found in {path}. Existing files: {' '.join(p.name for p in files)}") - - -def get_best_checkpoint_path(path: Path) -> Path: - """ - Given a path and checkpoint, formats a path based on the checkpoint file name format. - :param path to checkpoint folder - """ - return path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX - - -def find_all_recovery_checkpoints(path: Path) -> Optional[List[Path]]: - """ - Extracts all file starting with RECOVERY_CHECKPOINT_FILE_NAME in path - :param path: - :return: - """ - all_recovery_files = [f for f in path.glob(RECOVERY_CHECKPOINT_FILE_NAME + "*")] - if len(all_recovery_files) == 0: - return None - return all_recovery_files - - -PathAndEpoch = Tuple[Path, int] - - -def extract_latest_checkpoint_and_epoch(available_files: List[Path]) -> PathAndEpoch: - """ - Checkpoints are saved as recovery_epoch={epoch}.ckpt, find the latest ckpt and epoch number. - :param available_files: all available checkpoints - :return: path the checkpoint from latest epoch and epoch number - """ - recovery_epochs = [int(re.findall(r"[\d]+", f.stem)[0]) for f in available_files] - idx_max_epoch = int(np.argmax(recovery_epochs)) - return available_files[idx_max_epoch], recovery_epochs[idx_max_epoch] - - -def find_recovery_checkpoint_and_epoch(path: Path) -> Optional[PathAndEpoch]: - """ - Looks at all the recovery files, extracts the epoch number for all of them and returns the most recent (latest - epoch) - checkpoint path along with the corresponding epoch number. If no recovery checkpoint are found, return None. - :param path: The folder to start searching in. - :return: None if there is no file matching the search pattern, or a Tuple with Path object and integer pointing to - recovery checkpoint path and recovery epoch. - """ - available_checkpoints = find_all_recovery_checkpoints(path) - if available_checkpoints is None and is_running_in_azure_ml(): - logging.info("No recovery checkpoints available in the checkpoint folder. Trying to find checkpoints in " - "AzureML from previous runs of this job.") - # Download checkpoints from AzureML, then try to find recovery checkpoints among those. - temp_folder = download_checkpoints_to_temp_folder(RUN_CONTEXT) - available_checkpoints = find_all_recovery_checkpoints(temp_folder) - if available_checkpoints is not None: - return extract_latest_checkpoint_and_epoch(available_checkpoints) - return None - - -def download_checkpoints_to_temp_folder(run: Run) -> Path: - """ - Downloads all files with the outputs/checkpoints prefix of the given run to a temporary folder. - In distributed training, the download only happens once per node. - - :return: The path to which the files were downloaded. - """ - # Downloads should go to a temporary folder because downloading the files to the checkpoint folder might - # cause artifact conflicts later. - temp_folder = Path(tempfile.mkdtemp()) - checkpoint_prefix = f"{DEFAULT_AML_UPLOAD_DIR}/{CHECKPOINT_FOLDER}/" - existing_checkpoints = get_run_file_names(run, prefix=checkpoint_prefix) - logging.info(f"Number of checkpoints available in AzureML: {len(existing_checkpoints)}") - if len(existing_checkpoints) > 0: - try: - download_files_from_run_id(run_id=run.id, - output_folder=temp_folder, - prefix=checkpoint_prefix) - except Exception as ex: - logging.warning(f"Unable to download checkpoints from AzureML. Error: {str(ex)}") - return temp_folder - - -def create_best_checkpoint(path: Path) -> Path: - """ - Creates the best checkpoint file. "Best" is at the moment defined as being the last checkpoint, but could be - based on some defined policy. - The best checkpoint will be renamed to `best_checkpoint.ckpt`. - :param path: The folder that contains all checkpoint files. - """ - logging.debug(f"Files in checkpoint folder: {' '.join(p.name for p in path.glob('*'))}") - last_ckpt = path / LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX - all_files = f"Existing files: {' '.join(p.name for p in path.glob('*'))}" - if not last_ckpt.is_file(): - raise FileNotFoundError(f"Checkpoint file {LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX} not found. {all_files}") - logging.info(f"Using {LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX} as the best checkpoint: Renaming to " - f"{BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX}") - best = path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX - last_ckpt.rename(best) - return best - - def create_unique_timestamp_id() -> str: """ Creates a unique string using the current time in UTC, up to seconds precision, with characters that diff --git a/InnerEye/ML/deep_learning_config.py b/InnerEye/ML/deep_learning_config.py index ea4fc71bf..f8cf1e070 100644 --- a/InnerEye/ML/deep_learning_config.py +++ b/InnerEye/ML/deep_learning_config.py @@ -19,21 +19,9 @@ from InnerEye.Common.fixed_paths import DEFAULT_AML_UPLOAD_DIR, DEFAULT_LOGS_DIR_NAME from InnerEye.Common.generic_parsing import GenericConfig from InnerEye.Common.type_annotations import PathOrString, T, TupleFloat2 -from InnerEye.ML.common import DATASET_CSV_FILE_NAME, ModelExecutionMode, create_unique_timestamp_id, \ - get_best_checkpoint_path, get_recovery_checkpoint_path - -# A folder inside of the outputs folder that will contain all information for running the model in inference mode - -FINAL_MODEL_FOLDER = "final_model" -FINAL_ENSEMBLE_MODEL_FOLDER = "final_ensemble_model" - -# The checkpoints must be stored inside of the final model folder, if we want to avoid copying -# them before registration. -CHECKPOINT_FOLDER = "checkpoints" -VISUALIZATION_FOLDER = "visualizations" -EXTRA_RUN_SUBFOLDER = "extra_run_id" - -ARGS_TXT = "args.txt" +from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, CHECKPOINT_FOLDER, DATASET_CSV_FILE_NAME, \ + ModelExecutionMode, VISUALIZATION_FOLDER, \ + create_unique_timestamp_id @unique @@ -487,6 +475,7 @@ def get_path_to_checkpoint(self) -> Path: """ Returns the full path to a recovery checkpoint. """ + from InnerEye.ML.utils.checkpoint_handling import get_recovery_checkpoint_path return get_recovery_checkpoint_path(self.checkpoint_folder) def get_path_to_best_checkpoint(self) -> Path: @@ -864,3 +853,11 @@ def load_checkpoint(path_to_checkpoint: Path, use_gpu: bool = True) -> Dict[str, map_location = None if use_gpu else 'cpu' checkpoint = torch.load(str(path_to_checkpoint), map_location=map_location) return checkpoint + + +def get_best_checkpoint_path(path: Path) -> Path: + """ + Given a path and checkpoint, formats a path based on the checkpoint file name format. + :param path to checkpoint folder + """ + return path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX diff --git a/InnerEye/ML/lightning_container.py b/InnerEye/ML/lightning_container.py index 4e0955148..c42074c88 100644 --- a/InnerEye/ML/lightning_container.py +++ b/InnerEye/ML/lightning_container.py @@ -23,7 +23,6 @@ WorkflowParams from InnerEye.ML.utils import model_util from InnerEye.ML.utils.lr_scheduler import SchedulerWithWarmUp -from InnerEye.ML.utils.run_recovery import RunRecovery class InnerEyeInference(abc.ABC): @@ -151,7 +150,8 @@ def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self._model: Optional[LightningModule] = None self._model_name = type(self).__name__ - self.pretraining_run_checkpoints: Optional[RunRecovery] = None + # This should be typed RunRecovery, but causes circular imports + self.pretraining_run_checkpoints: Optional[Any] = None self.num_nodes = 1 def validate(self) -> None: diff --git a/InnerEye/ML/model_training.py b/InnerEye/ML/model_training.py index eb5cea2b0..ebb849762 100644 --- a/InnerEye/ML/model_training.py +++ b/InnerEye/ML/model_training.py @@ -19,8 +19,8 @@ from InnerEye.Azure.azure_util import RUN_CONTEXT, is_offline_run_context from InnerEye.Common.common_util import SUBJECT_METRICS_FILE_NAME, change_working_directory from InnerEye.Common.resource_monitor import ResourceMonitor -from InnerEye.ML.common import ModelExecutionMode, RECOVERY_CHECKPOINT_FILE_NAME, create_best_checkpoint -from InnerEye.ML.deep_learning_config import ARGS_TXT, VISUALIZATION_FOLDER +from InnerEye.ML.common import ARGS_TXT, ModelExecutionMode, RECOVERY_CHECKPOINT_FILE_NAME, VISUALIZATION_FOLDER +from InnerEye.ML.utils.checkpoint_handling import create_best_checkpoint from InnerEye.ML.lightning_base import InnerEyeContainer, InnerEyeLightning from InnerEye.ML.lightning_container import LightningContainer from InnerEye.ML.lightning_loggers import StoringLogger diff --git a/InnerEye/ML/normalize_and_visualize_dataset.py b/InnerEye/ML/normalize_and_visualize_dataset.py index 9ab0d6f6a..820decf58 100644 --- a/InnerEye/ML/normalize_and_visualize_dataset.py +++ b/InnerEye/ML/normalize_and_visualize_dataset.py @@ -15,10 +15,9 @@ from InnerEye.Common.common_util import logging_to_stdout from InnerEye.Common.generic_parsing import GenericConfig from InnerEye.ML import plotting -from InnerEye.ML.common import DATASET_CSV_FILE_NAME +from InnerEye.ML.common import ARGS_TXT, DATASET_CSV_FILE_NAME from InnerEye.ML.config import SegmentationModelBase from InnerEye.ML.dataset.full_image_dataset import load_dataset_sources -from InnerEye.ML.deep_learning_config import ARGS_TXT from InnerEye.ML.photometric_normalization import PhotometricNormalization from InnerEye.ML.utils.config_loader import ModelConfigLoader from InnerEye.ML.utils.io_util import load_images_from_dataset_source diff --git a/InnerEye/ML/run_ml.py b/InnerEye/ML/run_ml.py index a18712831..489c8bb46 100644 --- a/InnerEye/ML/run_ml.py +++ b/InnerEye/ML/run_ml.py @@ -36,10 +36,12 @@ from InnerEye.Common.fixed_paths import INNEREYE_PACKAGE_NAME, PYTHON_ENVIRONMENT_NAME from InnerEye.Common.type_annotations import PathOrString from InnerEye.ML.baselines_util import compare_folders_and_run_outputs -from InnerEye.ML.common import ModelExecutionMode +from InnerEye.ML.common import CHECKPOINT_FOLDER, EXTRA_RUN_SUBFOLDER, FINAL_ENSEMBLE_MODEL_FOLDER, \ + FINAL_MODEL_FOLDER, \ + ModelExecutionMode from InnerEye.ML.config import SegmentationModelBase -from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER, DeepLearningConfig, EXTRA_RUN_SUBFOLDER, \ - FINAL_ENSEMBLE_MODEL_FOLDER, FINAL_MODEL_FOLDER, ModelCategory, MultiprocessingStartMethod, load_checkpoint +from InnerEye.ML.deep_learning_config import DeepLearningConfig, ModelCategory, MultiprocessingStartMethod, \ + load_checkpoint from InnerEye.ML.lightning_base import InnerEyeContainer from InnerEye.ML.lightning_container import InnerEyeInference, LightningContainer from InnerEye.ML.lightning_loggers import StoringLogger @@ -53,8 +55,7 @@ get_ipynb_report_name, reports_folder from InnerEye.ML.scalar_config import ScalarModelBase from InnerEye.ML.sequence_config import SequenceModelBase -from InnerEye.ML.utils.checkpoint_handling import CheckpointHandler -from InnerEye.ML.utils.run_recovery import RunRecovery +from InnerEye.ML.utils.checkpoint_handling import CheckpointHandler, download_all_checkpoints_from_run from InnerEye.ML.visualizers import activation_maps from InnerEye.ML.visualizers.plot_cross_validation import \ get_config_and_results_for_offline_runs, plot_cross_validation_from_files @@ -183,10 +184,10 @@ def setup(self, azure_run_info: Optional[AzureRunInfo] = None) -> None: if self.container.pretraining_run_recovery_id is not None: run_to_recover = self.azure_config.fetch_run(self.container.pretraining_run_recovery_id.strip()) only_return_path = not is_global_rank_zero() - run_recovery_object = RunRecovery.download_all_checkpoints_from_run(self.container, - run_to_recover, - EXTRA_RUN_SUBFOLDER, - only_return_path=only_return_path) + run_recovery_object = download_all_checkpoints_from_run(self.container, + run_to_recover, + EXTRA_RUN_SUBFOLDER, + only_return_path=only_return_path) self.container.pretraining_run_checkpoints = run_recovery_object # A lot of the code for the built-in InnerEye models expects the output paths directly in the config files. diff --git a/InnerEye/ML/utils/checkpoint_handling.py b/InnerEye/ML/utils/checkpoint_handling.py index c004a0763..be039129e 100644 --- a/InnerEye/ML/utils/checkpoint_handling.py +++ b/InnerEye/ML/utils/checkpoint_handling.py @@ -4,24 +4,32 @@ # ------------------------------------------------------------------------------------------ import logging import os +import re +import tempfile +import time import uuid from builtins import property from pathlib import Path -from typing import List, Optional +from typing import List, Optional, Tuple from urllib.parse import urlparse +import numpy as np import requests -from azureml.core import Run, Workspace, Model -from health_azure.utils import is_global_rank_zero +from azureml.core import Model, Run, Workspace from InnerEye.Azure.azure_config import AzureConfig -from InnerEye.Common.fixed_paths import MODEL_INFERENCE_JSON_FILE_NAME -from InnerEye.ML.common import find_recovery_checkpoint_and_epoch +from InnerEye.Azure.azure_util import RUN_CONTEXT, download_run_output_file, download_run_outputs_by_prefix, \ + fetch_child_runs, tag_values_all_distinct +from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME +from InnerEye.Common.fixed_paths import DEFAULT_AML_UPLOAD_DIR, MODEL_INFERENCE_JSON_FILE_NAME +from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, CHECKPOINT_FOLDER, \ + LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, RECOVERY_CHECKPOINT_FILE_NAME from InnerEye.ML.deep_learning_config import OutputParams from InnerEye.ML.lightning_container import LightningContainer -from InnerEye.ML.utils.run_recovery import RunRecovery from InnerEye.ML.model_inference_config import read_model_inference_config - +from InnerEye.ML.utils.run_recovery import RunRecovery +from health_azure import download_files_from_run_id, is_running_in_azure_ml +from health_azure.utils import get_run_file_names, is_global_rank_zero MODEL_WEIGHTS_DIR_NAME = "trained_models" @@ -54,8 +62,8 @@ def download_checkpoints_from_hyperdrive_child_runs(self, hyperdrive_parent_run: Downloads the best checkpoints from all child runs of a Hyperdrive parent run. This is used to gather results for ensemble creation. """ - self.run_recovery = RunRecovery.download_best_checkpoints_from_child_runs(self.output_params, - hyperdrive_parent_run) + self.run_recovery = download_best_checkpoints_from_child_runs(self.output_params, + hyperdrive_parent_run) # Check paths are good, just in case for path in self.run_recovery.checkpoints_roots: if not path.is_dir(): @@ -72,8 +80,8 @@ def download_recovery_checkpoints_or_weights(self, only_return_path: bool = Fals """ if self.azure_config.run_recovery_id: run_to_recover = self.azure_config.fetch_run(self.azure_config.run_recovery_id.strip()) - self.run_recovery = RunRecovery.download_all_checkpoints_from_run(self.output_params, run_to_recover, - only_return_path=only_return_path) + self.run_recovery = download_all_checkpoints_from_run(self.output_params, run_to_recover, + only_return_path=only_return_path) if self.container.weights_url or self.container.local_weights_path or self.container.model_id: self.trained_weights_paths = self.get_local_checkpoints_path_or_download() @@ -230,3 +238,170 @@ def get_local_checkpoints_path_or_download(self) -> List[Path]: if not checkpoint_path or not checkpoint_path.is_file(): raise FileNotFoundError(f"Could not find the weights file at {checkpoint_path}") return checkpoint_paths + + +def download_checkpoints_to_temp_folder(run: Optional[Run] = None) -> Path: + """ + Downloads all files with the outputs/checkpoints prefix of the given run to a temporary folder. + In distributed training, the download only happens once per node. + + :return: The path to which the files were downloaded. + """ + run = run or RUN_CONTEXT + # Downloads should go to a temporary folder because downloading the files to the checkpoint folder might + # cause artifact conflicts later. + temp_folder = Path(tempfile.mkdtemp()) + checkpoint_prefix = f"{DEFAULT_AML_UPLOAD_DIR}/{CHECKPOINT_FOLDER}/" + existing_checkpoints = get_run_file_names(run, prefix=checkpoint_prefix) + logging.info(f"Number of checkpoints available in AzureML: {len(existing_checkpoints)}") + if len(existing_checkpoints) > 0: + try: + download_files_from_run_id(run_id=run.id, + output_folder=temp_folder, + prefix=checkpoint_prefix) + except Exception as ex: + logging.warning(f"Unable to download checkpoints from AzureML. Error: {str(ex)}") + return temp_folder + + +PathAndEpoch = Tuple[Path, int] + + +def find_recovery_checkpoint_and_epoch(path: Path) -> Optional[PathAndEpoch]: + """ + Looks at all the recovery files, extracts the epoch number for all of them and returns the most recent (latest + epoch) + checkpoint path along with the corresponding epoch number. If no recovery checkpoint are found, return None. + :param path: The folder to start searching in. + :return: None if there is no file matching the search pattern, or a Tuple with Path object and integer pointing to + recovery checkpoint path and recovery epoch. + """ + available_checkpoints = find_all_recovery_checkpoints(path) + if available_checkpoints is None and is_running_in_azure_ml(): + logging.info("No recovery checkpoints available in the checkpoint folder. Trying to find checkpoints in " + "AzureML from previous runs of this job.") + # Download checkpoints from AzureML, then try to find recovery checkpoints among those. + temp_folder = download_checkpoints_to_temp_folder() + available_checkpoints = find_all_recovery_checkpoints(temp_folder) + if available_checkpoints is not None: + return extract_latest_checkpoint_and_epoch(available_checkpoints) + return None + + +def get_recovery_checkpoint_path(path: Path) -> Path: + """ + Returns the path to the last recovery checkpoint in the given folder or the provided filename. Raises a + FileNotFoundError if no + recovery checkpoint file is present. + :param path: Path to checkpoint folder + """ + recovery_ckpt_and_epoch = find_recovery_checkpoint_and_epoch(path) + if recovery_ckpt_and_epoch is not None: + return recovery_ckpt_and_epoch[0] + files = list(path.glob("*")) + raise FileNotFoundError(f"No checkpoint files found in {path}. Existing files: {' '.join(p.name for p in files)}") + + +def find_all_recovery_checkpoints(path: Path) -> Optional[List[Path]]: + """ + Extracts all file starting with RECOVERY_CHECKPOINT_FILE_NAME in path + :param path: + :return: + """ + all_recovery_files = [f for f in path.glob(RECOVERY_CHECKPOINT_FILE_NAME + "*")] + if len(all_recovery_files) == 0: + return None + return all_recovery_files + + +def extract_latest_checkpoint_and_epoch(available_files: List[Path]) -> PathAndEpoch: + """ + Checkpoints are saved as recovery_epoch={epoch}.ckpt, find the latest ckpt and epoch number. + :param available_files: all available checkpoints + :return: path the checkpoint from latest epoch and epoch number + """ + recovery_epochs = [int(re.findall(r"[\d]+", f.stem)[0]) for f in available_files] + idx_max_epoch = int(np.argmax(recovery_epochs)) + return available_files[idx_max_epoch], recovery_epochs[idx_max_epoch] + + +def create_best_checkpoint(path: Path) -> Path: + """ + Creates the best checkpoint file. "Best" is at the moment defined as being the last checkpoint, but could be + based on some defined policy. + The best checkpoint will be renamed to `best_checkpoint.ckpt`. + :param path: The folder that contains all checkpoint files. + """ + logging.debug(f"Files in checkpoint folder: {' '.join(p.name for p in path.glob('*'))}") + last_ckpt = path / LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX + all_files = f"Existing files: {' '.join(p.name for p in path.glob('*'))}" + if not last_ckpt.is_file(): + raise FileNotFoundError(f"Checkpoint file {LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX} not found. {all_files}") + logging.info(f"Using {LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX} as the best checkpoint: Renaming to " + f"{BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX}") + best = path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX + last_ckpt.rename(best) + return best + + +def download_best_checkpoints_from_child_runs(config: OutputParams, run: Run) -> RunRecovery: + """ + Downloads the best checkpoints from all child runs of the provided Hyperdrive parent run. + The checkpoints for the sibling runs will go into folder 'OTHER_RUNS/' + in the checkpoint folder. There is special treatment for the child run that is equal to the present AzureML + run, its checkpoints will be read off the checkpoint folder as-is. + :param config: Model related configs. + :param run: The Hyperdrive parent run to download from. + :return: run recovery information + """ + child_runs: List[Run] = fetch_child_runs(run) + if not child_runs: + raise ValueError(f"AzureML run {run.id} does not have any child runs.") + logging.info(f"Run {run.id} has {len(child_runs)} child runs: {', '.join(c.id for c in child_runs)}") + tag_to_use = 'cross_validation_split_index' + can_use_split_indices = tag_values_all_distinct(child_runs, tag_to_use) + # download checkpoints for the child runs in the root of the parent + child_runs_checkpoints_roots: List[Path] = [] + for child in child_runs: + if child.id == RUN_CONTEXT.id: + # We expect to find the file(s) we need in config.checkpoint_folder + child_dst = config.checkpoint_folder + else: + subdir = str(child.tags[tag_to_use] if can_use_split_indices else child.number) + child_dst = config.checkpoint_folder / OTHER_RUNS_SUBDIR_NAME / subdir + download_run_output_file( + blob_path=Path(CHECKPOINT_FOLDER) / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, + destination=child_dst, + run=child + ) + child_runs_checkpoints_roots.append(child_dst) + return RunRecovery(checkpoints_roots=child_runs_checkpoints_roots) + + +def download_all_checkpoints_from_run(config: OutputParams, run: Run, + subfolder: Optional[str] = None, + only_return_path: bool = False) -> RunRecovery: + """ + Downloads all checkpoints of the provided run inside the checkpoints folder. + :param config: Model related configs. + :param run: Run whose checkpoints should be recovered + :param subfolder: optional subfolder name, if provided the checkpoints will be downloaded to + CHECKPOINT_FOLDER / subfolder. If None, the checkpoint are downloaded to CHECKPOINT_FOLDER of the current run. + :param: only_return_path: if True, return a RunRecovery object with the path to the checkpoint without actually + downloading the checkpoints. This is useful to avoid duplicating checkpoint download when running on multiple + nodes. If False, return the RunRecovery object and download the checkpoint to disk. + :return: run recovery information + """ + if fetch_child_runs(run): + raise ValueError(f"AzureML run {run.id} has child runs, this method does not support those.") + + destination_folder = config.checkpoint_folder / subfolder if subfolder else config.checkpoint_folder + + if not only_return_path: + download_run_outputs_by_prefix( + blobs_prefix=Path(CHECKPOINT_FOLDER), + destination=destination_folder, + run=run + ) + time.sleep(60) # Needed because AML is not fast enough to download + return RunRecovery(checkpoints_roots=[destination_folder]) diff --git a/InnerEye/ML/utils/run_recovery.py b/InnerEye/ML/utils/run_recovery.py index 620f05485..93e159371 100644 --- a/InnerEye/ML/utils/run_recovery.py +++ b/InnerEye/ML/utils/run_recovery.py @@ -5,19 +5,12 @@ from __future__ import annotations import logging -import time from dataclasses import dataclass from pathlib import Path -from typing import List, Optional +from typing import List -from azureml.core import Run - -from InnerEye.Azure.azure_util import RUN_CONTEXT, download_run_output_file, download_run_outputs_by_prefix, \ - fetch_child_runs, tag_values_all_distinct -from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME, check_properties_are_not_none -from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, get_best_checkpoint_path, \ - get_recovery_checkpoint_path -from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER, OutputParams +from InnerEye.Common.common_util import check_properties_are_not_none +from InnerEye.ML.deep_learning_config import get_best_checkpoint_path @dataclass(frozen=True) @@ -27,70 +20,8 @@ class RunRecovery: """ checkpoints_roots: List[Path] - @staticmethod - def download_best_checkpoints_from_child_runs(config: OutputParams, run: Run) -> RunRecovery: - """ - Downloads the best checkpoints from all child runs of the provided Hyperdrive parent run. - The checkpoints for the sibling runs will go into folder 'OTHER_RUNS/' - in the checkpoint folder. There is special treatment for the child run that is equal to the present AzureML - run, its checkpoints will be read off the checkpoint folder as-is. - :param config: Model related configs. - :param run: The Hyperdrive parent run to download from. - :return: run recovery information - """ - child_runs: List[Run] = fetch_child_runs(run) - if not child_runs: - raise ValueError(f"AzureML run {run.id} does not have any child runs.") - logging.info(f"Run {run.id} has {len(child_runs)} child runs: {', '.join(c.id for c in child_runs)}") - tag_to_use = 'cross_validation_split_index' - can_use_split_indices = tag_values_all_distinct(child_runs, tag_to_use) - # download checkpoints for the child runs in the root of the parent - child_runs_checkpoints_roots: List[Path] = [] - for child in child_runs: - if child.id == RUN_CONTEXT.id: - # We expect to find the file(s) we need in config.checkpoint_folder - child_dst = config.checkpoint_folder - else: - subdir = str(child.tags[tag_to_use] if can_use_split_indices else child.number) - child_dst = config.checkpoint_folder / OTHER_RUNS_SUBDIR_NAME / subdir - download_run_output_file( - blob_path=Path(CHECKPOINT_FOLDER) / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, - destination=child_dst, - run=child - ) - child_runs_checkpoints_roots.append(child_dst) - return RunRecovery(checkpoints_roots=child_runs_checkpoints_roots) - - @staticmethod - def download_all_checkpoints_from_run(config: OutputParams, run: Run, - subfolder: Optional[str] = None, - only_return_path: bool = False) -> RunRecovery: - """ - Downloads all checkpoints of the provided run inside the checkpoints folder. - :param config: Model related configs. - :param run: Run whose checkpoints should be recovered - :param subfolder: optional subfolder name, if provided the checkpoints will be downloaded to - CHECKPOINT_FOLDER / subfolder. If None, the checkpoint are downloaded to CHECKPOINT_FOLDER of the current run. - :param: only_return_path: if True, return a RunRecovery object with the path to the checkpoint without actually - downloading the checkpoints. This is useful to avoid duplicating checkpoint download when running on multiple - nodes. If False, return the RunRecovery object and download the checkpoint to disk. - :return: run recovery information - """ - if fetch_child_runs(run): - raise ValueError(f"AzureML run {run.id} has child runs, this method does not support those.") - - destination_folder = config.checkpoint_folder / subfolder if subfolder else config.checkpoint_folder - - if not only_return_path: - download_run_outputs_by_prefix( - blobs_prefix=Path(CHECKPOINT_FOLDER), - destination=destination_folder, - run=run - ) - time.sleep(60) # Needed because AML is not fast enough to download - return RunRecovery(checkpoints_roots=[destination_folder]) - def get_recovery_checkpoint_paths(self) -> List[Path]: + from InnerEye.ML.utils.checkpoint_handling import get_recovery_checkpoint_path return [get_recovery_checkpoint_path(x) for x in self.checkpoints_roots] def get_best_checkpoint_paths(self) -> List[Path]: diff --git a/InnerEye/Scripts/move_model.py b/InnerEye/Scripts/move_model.py index 7268f859b..24b69832e 100644 --- a/InnerEye/Scripts/move_model.py +++ b/InnerEye/Scripts/move_model.py @@ -10,7 +10,7 @@ from attr import dataclass from azureml.core import Environment, Model, Workspace -from InnerEye.ML.deep_learning_config import FINAL_MODEL_FOLDER, FINAL_ENSEMBLE_MODEL_FOLDER +from InnerEye.ML.common import FINAL_ENSEMBLE_MODEL_FOLDER, FINAL_MODEL_FOLDER PYTHON_ENVIRONMENT_NAME = "python_environment_name" MODEL_PATH = "MODEL" diff --git a/Tests/AfterTraining/test_after_training.py b/Tests/AfterTraining/test_after_training.py index ee914b381..06bcbaede 100644 --- a/Tests/AfterTraining/test_after_training.py +++ b/Tests/AfterTraining/test_after_training.py @@ -20,12 +20,14 @@ import pytest from azureml._restclient.constants import RunStatus from azureml.core import Model, Run -from health_azure.himl import RUN_RECOVERY_FILE from InnerEye.Azure.azure_config import AzureConfig -from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, download_run_output_file, download_run_outputs_by_prefix, \ +from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, RUN_CONTEXT, download_run_output_file, \ + download_run_outputs_by_prefix, \ get_comparison_baseline_paths, \ is_running_on_azure_agent, to_azure_friendly_string +from InnerEye.ML.utils.checkpoint_handling import download_checkpoints_to_temp_folder, \ + find_recovery_checkpoint_and_epoch from InnerEye.Common import common_util, fixed_paths, fixed_paths_for_tests from InnerEye.Common.common_util import BEST_EPOCH_FOLDER_NAME, CROSSVAL_RESULTS_FOLDER, ENSEMBLE_SPLIT_NAME, \ get_best_epoch_results_path @@ -34,10 +36,11 @@ from InnerEye.Common.fixed_paths_for_tests import full_ml_test_data_path from InnerEye.Common.output_directories import OutputFolderForTests from InnerEye.Common.spawn_subprocess import spawn_and_monitor_subprocess -from InnerEye.ML.common import DATASET_CSV_FILE_NAME, ModelExecutionMode +from InnerEye.ML.common import (CHECKPOINT_FOLDER, DATASET_CSV_FILE_NAME, ModelExecutionMode, + RECOVERY_CHECKPOINT_FILE_NAME) from InnerEye.ML.configs.other.HelloContainer import HelloContainer from InnerEye.ML.configs.segmentation.BasicModel2Epochs import BasicModel2Epochs -from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER, ModelCategory +from InnerEye.ML.deep_learning_config import ModelCategory from InnerEye.ML.model_inference_config import read_model_inference_config from InnerEye.ML.model_testing import THUMBNAILS_FOLDER from InnerEye.ML.reports.notebook_report import get_html_report_name @@ -49,6 +52,7 @@ from InnerEye.ML.visualizers.plot_cross_validation import PlotCrossValidationConfig from InnerEye.Scripts import submit_for_inference from Tests.ML.util import assert_nifti_content, get_default_azure_config, get_default_workspace, get_nifti_shape +from health_azure.himl import RUN_RECOVERY_FILE FALLBACK_SINGLE_RUN = "refs_pull_606_merge:refs_pull_606_merge_1638867172_17ba8dc5" FALLBACK_ENSEMBLE_RUN = "refs_pull_606_merge:HD_b8a6ad93-8c19-45de-8ea1-f87fce92c3bd" @@ -215,6 +219,28 @@ def test_check_dataset_mountpoint(test_output_dirs: OutputFolderForTests) -> Non assert f"local_dataset : {expected_mountpoint}" in logs +@pytest.mark.after_training_single_run +def test_download_checkpoints_from_aml(test_output_dirs: OutputFolderForTests) -> None: + """ + Check that we can download checkpoint files from an AzureML run, if they are not available on disk. + """ + run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_SINGLE_RUN) + temp_folder = download_checkpoints_to_temp_folder(run) + files = list(temp_folder.glob("*")) + assert len(files) == 2 + # Now test if that is correctly integrated into the checkpoint finder. To avoid downloading a second time, + # now mock the call to the actual downloader. + with mock.patch("InnerEye.ML.common.download_checkpoints_to_temp_folder", return_value=temp_folder) as download: + download.assert_called_once_with() + # Call the checkpoint finder with a temp folder that does not contain any files, so it should try to download + result = find_recovery_checkpoint_and_epoch(test_output_dirs.root_dir) + assert result is not None + p, epoch = result + # The basic model only writes one checkpoint at epoch 1 + assert epoch == 1 + assert RECOVERY_CHECKPOINT_FILE_NAME in p.stem + + @pytest.mark.inference def test_submit_for_inference(test_output_dirs: OutputFolderForTests) -> None: """ diff --git a/Tests/ML/test_download_upload.py b/Tests/ML/test_download_upload.py index 4f7f67b42..f1fc5dc01 100644 --- a/Tests/ML/test_download_upload.py +++ b/Tests/ML/test_download_upload.py @@ -12,6 +12,8 @@ from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME, logging_to_stdout from InnerEye.Common.output_directories import OutputFolderForTests from InnerEye.ML.model_config_base import ModelConfigBase +from InnerEye.ML.utils.checkpoint_handling import download_all_checkpoints_from_run, \ + download_best_checkpoints_from_child_runs from InnerEye.ML.utils.run_recovery import RunRecovery from Tests.AfterTraining.test_after_training import FALLBACK_ENSEMBLE_RUN, FALLBACK_SINGLE_RUN, get_most_recent_run from Tests.ML.util import get_default_azure_config @@ -43,7 +45,7 @@ def test_download_recovery_single_run(test_output_dirs: OutputFolderForTests, config = ModelConfigBase(should_validate=False) config.set_output_to(output_dir) run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_SINGLE_RUN) - run_recovery = RunRecovery.download_all_checkpoints_from_run(config, run) + run_recovery = download_all_checkpoints_from_run(config, run) # This fails if there is no recovery checkpoint check_single_checkpoint(run_recovery.get_recovery_checkpoint_paths()) @@ -58,7 +60,7 @@ def test_download_best_checkpoints_ensemble_run(test_output_dirs: OutputFolderFo config.set_output_to(output_dir) run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_ENSEMBLE_RUN) - run_recovery = RunRecovery.download_best_checkpoints_from_child_runs(config, run) + run_recovery = download_best_checkpoints_from_child_runs(config, run) other_runs_folder = config.checkpoint_folder / OTHER_RUNS_SUBDIR_NAME assert other_runs_folder.is_dir() for child in ["0", "1"]: diff --git a/Tests/ML/test_lightning_containers.py b/Tests/ML/test_lightning_containers.py index 109901a97..64eda2d5c 100644 --- a/Tests/ML/test_lightning_containers.py +++ b/Tests/ML/test_lightning_containers.py @@ -16,8 +16,8 @@ from InnerEye.Azure.azure_config import AzureConfig from InnerEye.Common.output_directories import OutputFolderForTests -from InnerEye.ML.common import ModelExecutionMode -from InnerEye.ML.deep_learning_config import ARGS_TXT, DatasetParams, WorkflowParams +from InnerEye.ML.common import ARGS_TXT, ModelExecutionMode +from InnerEye.ML.deep_learning_config import DatasetParams, WorkflowParams from InnerEye.ML.lightning_base import InnerEyeContainer from InnerEye.ML.lightning_container import LightningContainer from InnerEye.ML.model_config_base import ModelConfigBase diff --git a/Tests/ML/test_regression_tests.py b/Tests/ML/test_regression_tests.py index 96dea3c4c..b939a0dde 100644 --- a/Tests/ML/test_regression_tests.py +++ b/Tests/ML/test_regression_tests.py @@ -15,7 +15,7 @@ from InnerEye.ML import baselines_util from InnerEye.ML.baselines_util import REGRESSION_TEST_AZUREML_FOLDER, REGRESSION_TEST_AZUREML_PARENT_FOLDER, \ REGRESSION_TEST_OUTPUT_FOLDER, compare_files, compare_folder_contents, compare_folders_and_run_outputs -from InnerEye.ML.deep_learning_config import FINAL_MODEL_FOLDER +from InnerEye.ML.common import FINAL_MODEL_FOLDER from InnerEye.ML.run_ml import MLRunner from Tests.AfterTraining.test_after_training import FALLBACK_ENSEMBLE_RUN, FALLBACK_SINGLE_RUN, get_most_recent_run from Tests.ML.configs.lightning_test_containers import DummyContainerWithModel diff --git a/Tests/ML/utils/test_checkpoint_handling.py b/Tests/ML/utils/test_checkpoint_handling.py index 5ea8d75bf..d44517027 100644 --- a/Tests/ML/utils/test_checkpoint_handling.py +++ b/Tests/ML/utils/test_checkpoint_handling.py @@ -12,10 +12,9 @@ from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME from InnerEye.Common.fixed_paths import MODEL_INFERENCE_JSON_FILE_NAME -from InnerEye.ML.utils.checkpoint_handling import MODEL_WEIGHTS_DIR_NAME +from InnerEye.ML.utils.checkpoint_handling import MODEL_WEIGHTS_DIR_NAME, get_recovery_checkpoint_path from InnerEye.Common.output_directories import OutputFolderForTests -from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, get_recovery_checkpoint_path -from InnerEye.ML.deep_learning_config import FINAL_MODEL_FOLDER, FINAL_ENSEMBLE_MODEL_FOLDER +from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, FINAL_ENSEMBLE_MODEL_FOLDER, FINAL_MODEL_FOLDER from InnerEye.ML.model_config_base import ModelConfigBase from InnerEye.ML.model_inference_config import read_model_inference_config from InnerEye.ML.utils.checkpoint_handling import CheckpointHandler diff --git a/Tests/ML/utils/test_model_util.py b/Tests/ML/utils/test_model_util.py index 13a64db69..db692b9d4 100644 --- a/Tests/ML/utils/test_model_util.py +++ b/Tests/ML/utils/test_model_util.py @@ -12,8 +12,10 @@ from InnerEye.Common.output_directories import OutputFolderForTests from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, LAST_CHECKPOINT_FILE_NAME, \ - LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, RECOVERY_CHECKPOINT_FILE_NAME, create_best_checkpoint, \ - extract_latest_checkpoint_and_epoch, find_all_recovery_checkpoints, find_recovery_checkpoint_and_epoch + LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, RECOVERY_CHECKPOINT_FILE_NAME +from InnerEye.ML.utils.checkpoint_handling import create_best_checkpoint, extract_latest_checkpoint_and_epoch, \ + find_all_recovery_checkpoints, \ + find_recovery_checkpoint_and_epoch from InnerEye.ML.config import SegmentationModelBase from InnerEye.ML.lightning_base import InnerEyeContainer from InnerEye.ML.lightning_helpers import load_from_checkpoint_and_adjust_for_inference diff --git a/TestsOutsidePackage/test_register_model.py b/TestsOutsidePackage/test_register_model.py index 6e79a8ff4..9a4b93cc8 100644 --- a/TestsOutsidePackage/test_register_model.py +++ b/TestsOutsidePackage/test_register_model.py @@ -11,7 +11,7 @@ from InnerEye.Common import fixed_paths from InnerEye.Common.output_directories import OutputFolderForTests from InnerEye.ML.config import SegmentationModelBase -from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER +from InnerEye.ML.common import CHECKPOINT_FOLDER from InnerEye.ML.model_inference_config import ModelInferenceConfig from InnerEye.ML.run_ml import MLRunner From 7244dfc2b6d63a80d69844bc5aae1d0a02a642b4 Mon Sep 17 00:00:00 2001 From: Anton Schwaighofer Date: Thu, 9 Dec 2021 15:04:03 +0000 Subject: [PATCH 4/8] cleanup --- InnerEye/ML/common.py | 9 +++++++++ InnerEye/ML/deep_learning_config.py | 12 ++---------- InnerEye/ML/utils/checkpoint_handling.py | 6 ++++-- InnerEye/ML/utils/run_recovery.py | 2 +- Tests/AfterTraining/test_after_training.py | 2 +- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/InnerEye/ML/common.py b/InnerEye/ML/common.py index f12aaa5f9..44912690c 100644 --- a/InnerEye/ML/common.py +++ b/InnerEye/ML/common.py @@ -7,6 +7,7 @@ import abc from datetime import datetime from enum import Enum, unique +from pathlib import Path from typing import Any, Dict, List DATASET_CSV_FILE_NAME = "dataset.csv" @@ -76,3 +77,11 @@ def create_unique_timestamp_id() -> str: """ unique_id = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") return unique_id + + +def get_best_checkpoint_path(path: Path) -> Path: + """ + Given a path and checkpoint, formats a path based on the checkpoint file name format. + :param path to checkpoint folder + """ + return path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX \ No newline at end of file diff --git a/InnerEye/ML/deep_learning_config.py b/InnerEye/ML/deep_learning_config.py index f8cf1e070..ba9d74562 100644 --- a/InnerEye/ML/deep_learning_config.py +++ b/InnerEye/ML/deep_learning_config.py @@ -19,9 +19,9 @@ from InnerEye.Common.fixed_paths import DEFAULT_AML_UPLOAD_DIR, DEFAULT_LOGS_DIR_NAME from InnerEye.Common.generic_parsing import GenericConfig from InnerEye.Common.type_annotations import PathOrString, T, TupleFloat2 -from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, CHECKPOINT_FOLDER, DATASET_CSV_FILE_NAME, \ +from InnerEye.ML.common import CHECKPOINT_FOLDER, DATASET_CSV_FILE_NAME, \ ModelExecutionMode, VISUALIZATION_FOLDER, \ - create_unique_timestamp_id + create_unique_timestamp_id, get_best_checkpoint_path @unique @@ -853,11 +853,3 @@ def load_checkpoint(path_to_checkpoint: Path, use_gpu: bool = True) -> Dict[str, map_location = None if use_gpu else 'cpu' checkpoint = torch.load(str(path_to_checkpoint), map_location=map_location) return checkpoint - - -def get_best_checkpoint_path(path: Path) -> Path: - """ - Given a path and checkpoint, formats a path based on the checkpoint file name format. - :param path to checkpoint folder - """ - return path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX diff --git a/InnerEye/ML/utils/checkpoint_handling.py b/InnerEye/ML/utils/checkpoint_handling.py index be039129e..76697a4f2 100644 --- a/InnerEye/ML/utils/checkpoint_handling.py +++ b/InnerEye/ML/utils/checkpoint_handling.py @@ -240,7 +240,7 @@ def get_local_checkpoints_path_or_download(self) -> List[Path]: return checkpoint_paths -def download_checkpoints_to_temp_folder(run: Optional[Run] = None) -> Path: +def download_checkpoints_to_temp_folder(run: Optional[Run] = None, workspace: Optional[Workspace] = None) -> Path: """ Downloads all files with the outputs/checkpoints prefix of the given run to a temporary folder. In distributed training, the download only happens once per node. @@ -256,9 +256,11 @@ def download_checkpoints_to_temp_folder(run: Optional[Run] = None) -> Path: logging.info(f"Number of checkpoints available in AzureML: {len(existing_checkpoints)}") if len(existing_checkpoints) > 0: try: + logging.info(f"Downloading checkpoints to {temp_folder}") download_files_from_run_id(run_id=run.id, output_folder=temp_folder, - prefix=checkpoint_prefix) + prefix=checkpoint_prefix, + workspace=workspace) except Exception as ex: logging.warning(f"Unable to download checkpoints from AzureML. Error: {str(ex)}") return temp_folder diff --git a/InnerEye/ML/utils/run_recovery.py b/InnerEye/ML/utils/run_recovery.py index 93e159371..5c9809fdc 100644 --- a/InnerEye/ML/utils/run_recovery.py +++ b/InnerEye/ML/utils/run_recovery.py @@ -10,7 +10,7 @@ from typing import List from InnerEye.Common.common_util import check_properties_are_not_none -from InnerEye.ML.deep_learning_config import get_best_checkpoint_path +from InnerEye.ML.common import get_best_checkpoint_path @dataclass(frozen=True) diff --git a/Tests/AfterTraining/test_after_training.py b/Tests/AfterTraining/test_after_training.py index 06bcbaede..9a158d677 100644 --- a/Tests/AfterTraining/test_after_training.py +++ b/Tests/AfterTraining/test_after_training.py @@ -225,7 +225,7 @@ def test_download_checkpoints_from_aml(test_output_dirs: OutputFolderForTests) - Check that we can download checkpoint files from an AzureML run, if they are not available on disk. """ run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_SINGLE_RUN) - temp_folder = download_checkpoints_to_temp_folder(run) + temp_folder = download_checkpoints_to_temp_folder(run, workspace=get_default_workspace()) files = list(temp_folder.glob("*")) assert len(files) == 2 # Now test if that is correctly integrated into the checkpoint finder. To avoid downloading a second time, From a2ab76d4b77cc56ecd806cded57f2a001dcc1d39 Mon Sep 17 00:00:00 2001 From: Anton Schwaighofer Date: Thu, 9 Dec 2021 15:04:24 +0000 Subject: [PATCH 5/8] hi-ml main --- hi-ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hi-ml b/hi-ml index 334186321..38fd68557 160000 --- a/hi-ml +++ b/hi-ml @@ -1 +1 @@ -Subproject commit 334186321f6989033f5609880781ba4c299f6f67 +Subproject commit 38fd685579749e6c5e5f8c199c76c5854394421c From 1bfc582843a541c698bbc8fce50276b27ecee004 Mon Sep 17 00:00:00 2001 From: Anton Schwaighofer Date: Thu, 9 Dec 2021 15:06:55 +0000 Subject: [PATCH 6/8] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 74c6d9784..bf8c8bb5b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ jobs that run in AzureML. - ([#589](https://github.com/microsoft/InnerEye-DeepLearning/pull/589)) Add `LightningContainer.update_azure_config()` hook to enable overriding `AzureConfig` parameters from a container (e.g. `experiment_name`, `cluster`, `num_nodes`). -([#603](https://github.com/microsoft/InnerEye-DeepLearning/pull/603)) Add histopathology module +-([#614](https://github.com/microsoft/InnerEye-DeepLearning/pull/614)) Checkpoint downloading falls back to looking into AzureML if no checkpoints on disk ### Changed From f50afac3bd2b23d7a0785843d898dd45445ca1fa Mon Sep 17 00:00:00 2001 From: Anton Schwaighofer Date: Thu, 9 Dec 2021 15:08:41 +0000 Subject: [PATCH 7/8] flake --- InnerEye/ML/common.py | 2 +- Tests/AfterTraining/test_after_training.py | 6 +++--- Tests/ML/test_download_upload.py | 1 - 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/InnerEye/ML/common.py b/InnerEye/ML/common.py index 44912690c..d92e07e0e 100644 --- a/InnerEye/ML/common.py +++ b/InnerEye/ML/common.py @@ -84,4 +84,4 @@ def get_best_checkpoint_path(path: Path) -> Path: Given a path and checkpoint, formats a path based on the checkpoint file name format. :param path to checkpoint folder """ - return path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX \ No newline at end of file + return path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX diff --git a/Tests/AfterTraining/test_after_training.py b/Tests/AfterTraining/test_after_training.py index 9a158d677..428e1620e 100644 --- a/Tests/AfterTraining/test_after_training.py +++ b/Tests/AfterTraining/test_after_training.py @@ -22,12 +22,10 @@ from azureml.core import Model, Run from InnerEye.Azure.azure_config import AzureConfig -from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, RUN_CONTEXT, download_run_output_file, \ +from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, download_run_output_file, \ download_run_outputs_by_prefix, \ get_comparison_baseline_paths, \ is_running_on_azure_agent, to_azure_friendly_string -from InnerEye.ML.utils.checkpoint_handling import download_checkpoints_to_temp_folder, \ - find_recovery_checkpoint_and_epoch from InnerEye.Common import common_util, fixed_paths, fixed_paths_for_tests from InnerEye.Common.common_util import BEST_EPOCH_FOLDER_NAME, CROSSVAL_RESULTS_FOLDER, ENSEMBLE_SPLIT_NAME, \ get_best_epoch_results_path @@ -46,6 +44,8 @@ from InnerEye.ML.reports.notebook_report import get_html_report_name from InnerEye.ML.run_ml import MLRunner from InnerEye.ML.runner import main +from InnerEye.ML.utils.checkpoint_handling import download_checkpoints_to_temp_folder, \ + find_recovery_checkpoint_and_epoch from InnerEye.ML.utils.config_loader import ModelConfigLoader from InnerEye.ML.utils.image_util import get_unit_image_header from InnerEye.ML.utils.io_util import zip_random_dicom_series diff --git a/Tests/ML/test_download_upload.py b/Tests/ML/test_download_upload.py index f1fc5dc01..5bb34bc97 100644 --- a/Tests/ML/test_download_upload.py +++ b/Tests/ML/test_download_upload.py @@ -14,7 +14,6 @@ from InnerEye.ML.model_config_base import ModelConfigBase from InnerEye.ML.utils.checkpoint_handling import download_all_checkpoints_from_run, \ download_best_checkpoints_from_child_runs -from InnerEye.ML.utils.run_recovery import RunRecovery from Tests.AfterTraining.test_after_training import FALLBACK_ENSEMBLE_RUN, FALLBACK_SINGLE_RUN, get_most_recent_run from Tests.ML.util import get_default_azure_config From 32f3f20f404513f6269a956af3a18bdb318911d2 Mon Sep 17 00:00:00 2001 From: Anton Schwaighofer Date: Thu, 9 Dec 2021 16:16:53 +0000 Subject: [PATCH 8/8] test fix --- InnerEye/ML/utils/checkpoint_handling.py | 11 +++++++++-- Tests/AfterTraining/test_after_training.py | 23 +++++++++++++--------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/InnerEye/ML/utils/checkpoint_handling.py b/InnerEye/ML/utils/checkpoint_handling.py index 76697a4f2..7cbe03060 100644 --- a/InnerEye/ML/utils/checkpoint_handling.py +++ b/InnerEye/ML/utils/checkpoint_handling.py @@ -245,7 +245,12 @@ def download_checkpoints_to_temp_folder(run: Optional[Run] = None, workspace: Op Downloads all files with the outputs/checkpoints prefix of the given run to a temporary folder. In distributed training, the download only happens once per node. - :return: The path to which the files were downloaded. + :param run: If provided, download the files from that run. If omitted, download the files from the current run + (taken from RUN_CONTEXT) + :param workspace: The AML workspace where the run is located. If omitted, the hi-ml defaults of finding a workspace + are used (current workspace when running in AzureML, otherwise expecting a config.json file) + :return: The path to which the files were downloaded. The files are located in that folder, without any further + subfolders. """ run = run or RUN_CONTEXT # Downloads should go to a temporary folder because downloading the files to the checkpoint folder might @@ -263,7 +268,9 @@ def download_checkpoints_to_temp_folder(run: Optional[Run] = None, workspace: Op workspace=workspace) except Exception as ex: logging.warning(f"Unable to download checkpoints from AzureML. Error: {str(ex)}") - return temp_folder + # Checkpoint downloads preserve the full folder structure, point the caller directly to the folder where the + # checkpoints really are. + return temp_folder / DEFAULT_AML_UPLOAD_DIR / CHECKPOINT_FOLDER PathAndEpoch = Tuple[Path, int] diff --git a/Tests/AfterTraining/test_after_training.py b/Tests/AfterTraining/test_after_training.py index 428e1620e..48176b389 100644 --- a/Tests/AfterTraining/test_after_training.py +++ b/Tests/AfterTraining/test_after_training.py @@ -228,17 +228,22 @@ def test_download_checkpoints_from_aml(test_output_dirs: OutputFolderForTests) - temp_folder = download_checkpoints_to_temp_folder(run, workspace=get_default_workspace()) files = list(temp_folder.glob("*")) assert len(files) == 2 + # Test if what's in the folder are really files, not directories + for file in files: + assert file.is_file() # Now test if that is correctly integrated into the checkpoint finder. To avoid downloading a second time, # now mock the call to the actual downloader. - with mock.patch("InnerEye.ML.common.download_checkpoints_to_temp_folder", return_value=temp_folder) as download: - download.assert_called_once_with() - # Call the checkpoint finder with a temp folder that does not contain any files, so it should try to download - result = find_recovery_checkpoint_and_epoch(test_output_dirs.root_dir) - assert result is not None - p, epoch = result - # The basic model only writes one checkpoint at epoch 1 - assert epoch == 1 - assert RECOVERY_CHECKPOINT_FILE_NAME in p.stem + with mock.patch("InnerEye.ML.utils.checkpoint_handling.is_running_in_azure_ml", return_value=True): + with mock.patch("InnerEye.ML.utils.checkpoint_handling.download_checkpoints_to_temp_folder", + return_value=temp_folder) as download: + # Call the checkpoint finder with a temp folder that does not contain any files, so it should try to download + result = find_recovery_checkpoint_and_epoch(test_output_dirs.root_dir) + download.assert_called_once_with() + assert result is not None + p, epoch = result + # The basic model only writes one checkpoint at epoch 1 + assert epoch == 1 + assert RECOVERY_CHECKPOINT_FILE_NAME in p.stem @pytest.mark.inference