diff --git a/CHANGELOG.md b/CHANGELOG.md index 12258837d..89c9bc11f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ jobs that run in AzureML. - ([#589](https://github.com/microsoft/InnerEye-DeepLearning/pull/589)) Add `LightningContainer.update_azure_config()` hook to enable overriding `AzureConfig` parameters from a container (e.g. `experiment_name`, `cluster`, `num_nodes`). -([#603](https://github.com/microsoft/InnerEye-DeepLearning/pull/603)) Add histopathology module +-([#614](https://github.com/microsoft/InnerEye-DeepLearning/pull/614)) Checkpoint downloading falls back to looking into AzureML if no checkpoints on disk -([#613](https://github.com/microsoft/InnerEye-DeepLearning/pull/613)) Add additional tests for histopathology datasets diff --git a/InnerEye/ML/common.py b/InnerEye/ML/common.py index cafe8970b..d92e07e0e 100644 --- a/InnerEye/ML/common.py +++ b/InnerEye/ML/common.py @@ -2,15 +2,13 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License (MIT). See LICENSE in the repo root for license information. # ------------------------------------------------------------------------------------------ +from __future__ import annotations + import abc -import logging -import re from datetime import datetime from enum import Enum, unique from pathlib import Path -from typing import Any, Dict, List, Optional, Tuple - -import numpy as np +from typing import Any, Dict, List DATASET_CSV_FILE_NAME = "dataset.csv" CHECKPOINT_SUFFIX = ".ckpt" @@ -26,6 +24,13 @@ LAST_CHECKPOINT_FILE_NAME = "last" LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX = LAST_CHECKPOINT_FILE_NAME + CHECKPOINT_SUFFIX +FINAL_MODEL_FOLDER = "final_model" +FINAL_ENSEMBLE_MODEL_FOLDER = "final_ensemble_model" +CHECKPOINT_FOLDER = "checkpoints" +VISUALIZATION_FOLDER = "visualizations" +EXTRA_RUN_SUBFOLDER = "extra_run_id" +ARGS_TXT = "args.txt" + @unique class ModelExecutionMode(Enum): @@ -64,18 +69,14 @@ def get_feature_length(self, column: str) -> int: raise NotImplementedError("get_feature_length must be implemented by sub classes") -def get_recovery_checkpoint_path(path: Path) -> Path: +def create_unique_timestamp_id() -> str: """ - Returns the path to the last recovery checkpoint in the given folder or the provided filename. Raises a - FileNotFoundError if no - recovery checkpoint file is present. - :param path: Path to checkpoint folder + Creates a unique string using the current time in UTC, up to seconds precision, with characters that + are suitable for use in filenames. For example, on 31 Dec 2019 at 11:59:59pm UTC, the result would be + 2019-12-31T235959Z. """ - recovery_ckpt_and_epoch = find_recovery_checkpoint_and_epoch(path) - if recovery_ckpt_and_epoch is not None: - return recovery_ckpt_and_epoch[0] - files = list(path.glob("*")) - raise FileNotFoundError(f"No checkpoint files found in {path}. Existing files: {' '.join(p.name for p in files)}") + unique_id = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") + return unique_id def get_best_checkpoint_path(path: Path) -> Path: @@ -84,73 +85,3 @@ def get_best_checkpoint_path(path: Path) -> Path: :param path to checkpoint folder """ return path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX - - -def find_all_recovery_checkpoints(path: Path) -> Optional[List[Path]]: - """ - Extracts all file starting with RECOVERY_CHECKPOINT_FILE_NAME in path - :param path: - :return: - """ - all_recovery_files = [f for f in path.glob(RECOVERY_CHECKPOINT_FILE_NAME + "*")] - if len(all_recovery_files) == 0: - return None - return all_recovery_files - - -PathAndEpoch = Tuple[Path, int] - - -def extract_latest_checkpoint_and_epoch(available_files: List[Path]) -> PathAndEpoch: - """ - Checkpoints are saved as recovery_epoch={epoch}.ckpt, find the latest ckpt and epoch number. - :param available_files: all available checkpoints - :return: path the checkpoint from latest epoch and epoch number - """ - recovery_epochs = [int(re.findall(r"[\d]+", f.stem)[0]) for f in available_files] - idx_max_epoch = int(np.argmax(recovery_epochs)) - return available_files[idx_max_epoch], recovery_epochs[idx_max_epoch] - - -def find_recovery_checkpoint_and_epoch(path: Path) -> Optional[PathAndEpoch]: - """ - Looks at all the recovery files, extracts the epoch number for all of them and returns the most recent (latest - epoch) - checkpoint path along with the corresponding epoch number. If no recovery checkpoint are found, return None. - :param path: The folder to start searching in. - :return: None if there is no file matching the search pattern, or a Tuple with Path object and integer pointing to - recovery checkpoint path and recovery epoch. - """ - available_checkpoints = find_all_recovery_checkpoints(path) - if available_checkpoints is not None: - return extract_latest_checkpoint_and_epoch(available_checkpoints) - return None - - -def create_best_checkpoint(path: Path) -> Path: - """ - Creates the best checkpoint file. "Best" is at the moment defined as being the last checkpoint, but could be - based on some defined policy. - The best checkpoint will be renamed to `best_checkpoint.ckpt`. - :param path: The folder that contains all checkpoint files. - """ - logging.debug(f"Files in checkpoint folder: {' '.join(p.name for p in path.glob('*'))}") - last_ckpt = path / LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX - all_files = f"Existing files: {' '.join(p.name for p in path.glob('*'))}" - if not last_ckpt.is_file(): - raise FileNotFoundError(f"Checkpoint file {LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX} not found. {all_files}") - logging.info(f"Using {LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX} as the best checkpoint: Renaming to " - f"{BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX}") - best = path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX - last_ckpt.rename(best) - return best - - -def create_unique_timestamp_id() -> str: - """ - Creates a unique string using the current time in UTC, up to seconds precision, with characters that - are suitable for use in filenames. For example, on 31 Dec 2019 at 11:59:59pm UTC, the result would be - 2019-12-31T235959Z. - """ - unique_id = datetime.utcnow().strftime("%Y-%m-%dT%H%M%SZ") - return unique_id diff --git a/InnerEye/ML/deep_learning_config.py b/InnerEye/ML/deep_learning_config.py index ea4fc71bf..ba9d74562 100644 --- a/InnerEye/ML/deep_learning_config.py +++ b/InnerEye/ML/deep_learning_config.py @@ -19,21 +19,9 @@ from InnerEye.Common.fixed_paths import DEFAULT_AML_UPLOAD_DIR, DEFAULT_LOGS_DIR_NAME from InnerEye.Common.generic_parsing import GenericConfig from InnerEye.Common.type_annotations import PathOrString, T, TupleFloat2 -from InnerEye.ML.common import DATASET_CSV_FILE_NAME, ModelExecutionMode, create_unique_timestamp_id, \ - get_best_checkpoint_path, get_recovery_checkpoint_path - -# A folder inside of the outputs folder that will contain all information for running the model in inference mode - -FINAL_MODEL_FOLDER = "final_model" -FINAL_ENSEMBLE_MODEL_FOLDER = "final_ensemble_model" - -# The checkpoints must be stored inside of the final model folder, if we want to avoid copying -# them before registration. -CHECKPOINT_FOLDER = "checkpoints" -VISUALIZATION_FOLDER = "visualizations" -EXTRA_RUN_SUBFOLDER = "extra_run_id" - -ARGS_TXT = "args.txt" +from InnerEye.ML.common import CHECKPOINT_FOLDER, DATASET_CSV_FILE_NAME, \ + ModelExecutionMode, VISUALIZATION_FOLDER, \ + create_unique_timestamp_id, get_best_checkpoint_path @unique @@ -487,6 +475,7 @@ def get_path_to_checkpoint(self) -> Path: """ Returns the full path to a recovery checkpoint. """ + from InnerEye.ML.utils.checkpoint_handling import get_recovery_checkpoint_path return get_recovery_checkpoint_path(self.checkpoint_folder) def get_path_to_best_checkpoint(self) -> Path: diff --git a/InnerEye/ML/lightning_container.py b/InnerEye/ML/lightning_container.py index 4e0955148..c42074c88 100644 --- a/InnerEye/ML/lightning_container.py +++ b/InnerEye/ML/lightning_container.py @@ -23,7 +23,6 @@ WorkflowParams from InnerEye.ML.utils import model_util from InnerEye.ML.utils.lr_scheduler import SchedulerWithWarmUp -from InnerEye.ML.utils.run_recovery import RunRecovery class InnerEyeInference(abc.ABC): @@ -151,7 +150,8 @@ def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) self._model: Optional[LightningModule] = None self._model_name = type(self).__name__ - self.pretraining_run_checkpoints: Optional[RunRecovery] = None + # This should be typed RunRecovery, but causes circular imports + self.pretraining_run_checkpoints: Optional[Any] = None self.num_nodes = 1 def validate(self) -> None: diff --git a/InnerEye/ML/model_training.py b/InnerEye/ML/model_training.py index eb5cea2b0..ebb849762 100644 --- a/InnerEye/ML/model_training.py +++ b/InnerEye/ML/model_training.py @@ -19,8 +19,8 @@ from InnerEye.Azure.azure_util import RUN_CONTEXT, is_offline_run_context from InnerEye.Common.common_util import SUBJECT_METRICS_FILE_NAME, change_working_directory from InnerEye.Common.resource_monitor import ResourceMonitor -from InnerEye.ML.common import ModelExecutionMode, RECOVERY_CHECKPOINT_FILE_NAME, create_best_checkpoint -from InnerEye.ML.deep_learning_config import ARGS_TXT, VISUALIZATION_FOLDER +from InnerEye.ML.common import ARGS_TXT, ModelExecutionMode, RECOVERY_CHECKPOINT_FILE_NAME, VISUALIZATION_FOLDER +from InnerEye.ML.utils.checkpoint_handling import create_best_checkpoint from InnerEye.ML.lightning_base import InnerEyeContainer, InnerEyeLightning from InnerEye.ML.lightning_container import LightningContainer from InnerEye.ML.lightning_loggers import StoringLogger diff --git a/InnerEye/ML/normalize_and_visualize_dataset.py b/InnerEye/ML/normalize_and_visualize_dataset.py index 9ab0d6f6a..820decf58 100644 --- a/InnerEye/ML/normalize_and_visualize_dataset.py +++ b/InnerEye/ML/normalize_and_visualize_dataset.py @@ -15,10 +15,9 @@ from InnerEye.Common.common_util import logging_to_stdout from InnerEye.Common.generic_parsing import GenericConfig from InnerEye.ML import plotting -from InnerEye.ML.common import DATASET_CSV_FILE_NAME +from InnerEye.ML.common import ARGS_TXT, DATASET_CSV_FILE_NAME from InnerEye.ML.config import SegmentationModelBase from InnerEye.ML.dataset.full_image_dataset import load_dataset_sources -from InnerEye.ML.deep_learning_config import ARGS_TXT from InnerEye.ML.photometric_normalization import PhotometricNormalization from InnerEye.ML.utils.config_loader import ModelConfigLoader from InnerEye.ML.utils.io_util import load_images_from_dataset_source diff --git a/InnerEye/ML/run_ml.py b/InnerEye/ML/run_ml.py index e94a4ef08..cb6ed2168 100644 --- a/InnerEye/ML/run_ml.py +++ b/InnerEye/ML/run_ml.py @@ -36,10 +36,12 @@ from InnerEye.Common.fixed_paths import INNEREYE_PACKAGE_NAME, PYTHON_ENVIRONMENT_NAME from InnerEye.Common.type_annotations import PathOrString from InnerEye.ML.baselines_util import compare_folders_and_run_outputs -from InnerEye.ML.common import ModelExecutionMode +from InnerEye.ML.common import CHECKPOINT_FOLDER, EXTRA_RUN_SUBFOLDER, FINAL_ENSEMBLE_MODEL_FOLDER, \ + FINAL_MODEL_FOLDER, \ + ModelExecutionMode from InnerEye.ML.config import SegmentationModelBase -from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER, DeepLearningConfig, EXTRA_RUN_SUBFOLDER, \ - FINAL_ENSEMBLE_MODEL_FOLDER, FINAL_MODEL_FOLDER, ModelCategory, MultiprocessingStartMethod, load_checkpoint +from InnerEye.ML.deep_learning_config import DeepLearningConfig, ModelCategory, MultiprocessingStartMethod, \ + load_checkpoint from InnerEye.ML.lightning_base import InnerEyeContainer from InnerEye.ML.lightning_container import InnerEyeInference, LightningContainer from InnerEye.ML.lightning_loggers import StoringLogger @@ -53,8 +55,7 @@ get_ipynb_report_name, reports_folder from InnerEye.ML.scalar_config import ScalarModelBase from InnerEye.ML.sequence_config import SequenceModelBase -from InnerEye.ML.utils.checkpoint_handling import CheckpointHandler -from InnerEye.ML.utils.run_recovery import RunRecovery +from InnerEye.ML.utils.checkpoint_handling import CheckpointHandler, download_all_checkpoints_from_run from InnerEye.ML.visualizers import activation_maps from InnerEye.ML.visualizers.plot_cross_validation import \ get_config_and_results_for_offline_runs, plot_cross_validation_from_files @@ -183,10 +184,10 @@ def setup(self, azure_run_info: Optional[AzureRunInfo] = None) -> None: if self.container.pretraining_run_recovery_id is not None: run_to_recover = self.azure_config.fetch_run(self.container.pretraining_run_recovery_id.strip()) only_return_path = not is_global_rank_zero() - run_recovery_object = RunRecovery.download_all_checkpoints_from_run(self.container, - run_to_recover, - EXTRA_RUN_SUBFOLDER, - only_return_path=only_return_path) + run_recovery_object = download_all_checkpoints_from_run(self.container, + run_to_recover, + EXTRA_RUN_SUBFOLDER, + only_return_path=only_return_path) self.container.pretraining_run_checkpoints = run_recovery_object # A lot of the code for the built-in InnerEye models expects the output paths directly in the config files. diff --git a/InnerEye/ML/utils/checkpoint_handling.py b/InnerEye/ML/utils/checkpoint_handling.py index a445743fe..7cbe03060 100644 --- a/InnerEye/ML/utils/checkpoint_handling.py +++ b/InnerEye/ML/utils/checkpoint_handling.py @@ -4,23 +4,32 @@ # ------------------------------------------------------------------------------------------ import logging import os +import re +import tempfile +import time import uuid from builtins import property from pathlib import Path -from typing import List, Optional +from typing import List, Optional, Tuple from urllib.parse import urlparse +import numpy as np import requests -from azureml.core import Run, Workspace, Model +from azureml.core import Model, Run, Workspace from InnerEye.Azure.azure_config import AzureConfig -from InnerEye.Common.fixed_paths import MODEL_INFERENCE_JSON_FILE_NAME -from InnerEye.ML.common import find_recovery_checkpoint_and_epoch +from InnerEye.Azure.azure_util import RUN_CONTEXT, download_run_output_file, download_run_outputs_by_prefix, \ + fetch_child_runs, tag_values_all_distinct +from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME +from InnerEye.Common.fixed_paths import DEFAULT_AML_UPLOAD_DIR, MODEL_INFERENCE_JSON_FILE_NAME +from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, CHECKPOINT_FOLDER, \ + LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, RECOVERY_CHECKPOINT_FILE_NAME from InnerEye.ML.deep_learning_config import OutputParams from InnerEye.ML.lightning_container import LightningContainer -from InnerEye.ML.utils.run_recovery import RunRecovery from InnerEye.ML.model_inference_config import read_model_inference_config - +from InnerEye.ML.utils.run_recovery import RunRecovery +from health_azure import download_files_from_run_id, is_running_in_azure_ml +from health_azure.utils import get_run_file_names, is_global_rank_zero MODEL_WEIGHTS_DIR_NAME = "trained_models" @@ -53,8 +62,8 @@ def download_checkpoints_from_hyperdrive_child_runs(self, hyperdrive_parent_run: Downloads the best checkpoints from all child runs of a Hyperdrive parent run. This is used to gather results for ensemble creation. """ - self.run_recovery = RunRecovery.download_best_checkpoints_from_child_runs(self.output_params, - hyperdrive_parent_run) + self.run_recovery = download_best_checkpoints_from_child_runs(self.output_params, + hyperdrive_parent_run) # Check paths are good, just in case for path in self.run_recovery.checkpoints_roots: if not path.is_dir(): @@ -71,8 +80,8 @@ def download_recovery_checkpoints_or_weights(self, only_return_path: bool = Fals """ if self.azure_config.run_recovery_id: run_to_recover = self.azure_config.fetch_run(self.azure_config.run_recovery_id.strip()) - self.run_recovery = RunRecovery.download_all_checkpoints_from_run(self.output_params, run_to_recover, - only_return_path=only_return_path) + self.run_recovery = download_all_checkpoints_from_run(self.output_params, run_to_recover, + only_return_path=only_return_path) if self.container.weights_url or self.container.local_weights_path or self.container.model_id: self.trained_weights_paths = self.get_local_checkpoints_path_or_download() @@ -91,10 +100,11 @@ def get_recovery_or_checkpoint_path_train(self) -> Optional[Path]: the latest checkpoint will be present in this folder too. :return: Constructed checkpoint path to recover from. """ - checkpoints = list(self.container.checkpoint_folder.rglob("*")) - logging.debug(f"Available checkpoints: {len(checkpoints)}") - for f in checkpoints: - logging.debug(f) + if is_global_rank_zero(): + checkpoints = list(self.container.checkpoint_folder.rglob("*")) + logging.info(f"Available checkpoints: {len(checkpoints)}") + for f in checkpoints: + logging.info(f) recovery = find_recovery_checkpoint_and_epoch(self.container.checkpoint_folder) if recovery is not None: local_recovery_path, recovery_epoch = recovery @@ -228,3 +238,179 @@ def get_local_checkpoints_path_or_download(self) -> List[Path]: if not checkpoint_path or not checkpoint_path.is_file(): raise FileNotFoundError(f"Could not find the weights file at {checkpoint_path}") return checkpoint_paths + + +def download_checkpoints_to_temp_folder(run: Optional[Run] = None, workspace: Optional[Workspace] = None) -> Path: + """ + Downloads all files with the outputs/checkpoints prefix of the given run to a temporary folder. + In distributed training, the download only happens once per node. + + :param run: If provided, download the files from that run. If omitted, download the files from the current run + (taken from RUN_CONTEXT) + :param workspace: The AML workspace where the run is located. If omitted, the hi-ml defaults of finding a workspace + are used (current workspace when running in AzureML, otherwise expecting a config.json file) + :return: The path to which the files were downloaded. The files are located in that folder, without any further + subfolders. + """ + run = run or RUN_CONTEXT + # Downloads should go to a temporary folder because downloading the files to the checkpoint folder might + # cause artifact conflicts later. + temp_folder = Path(tempfile.mkdtemp()) + checkpoint_prefix = f"{DEFAULT_AML_UPLOAD_DIR}/{CHECKPOINT_FOLDER}/" + existing_checkpoints = get_run_file_names(run, prefix=checkpoint_prefix) + logging.info(f"Number of checkpoints available in AzureML: {len(existing_checkpoints)}") + if len(existing_checkpoints) > 0: + try: + logging.info(f"Downloading checkpoints to {temp_folder}") + download_files_from_run_id(run_id=run.id, + output_folder=temp_folder, + prefix=checkpoint_prefix, + workspace=workspace) + except Exception as ex: + logging.warning(f"Unable to download checkpoints from AzureML. Error: {str(ex)}") + # Checkpoint downloads preserve the full folder structure, point the caller directly to the folder where the + # checkpoints really are. + return temp_folder / DEFAULT_AML_UPLOAD_DIR / CHECKPOINT_FOLDER + + +PathAndEpoch = Tuple[Path, int] + + +def find_recovery_checkpoint_and_epoch(path: Path) -> Optional[PathAndEpoch]: + """ + Looks at all the recovery files, extracts the epoch number for all of them and returns the most recent (latest + epoch) + checkpoint path along with the corresponding epoch number. If no recovery checkpoint are found, return None. + :param path: The folder to start searching in. + :return: None if there is no file matching the search pattern, or a Tuple with Path object and integer pointing to + recovery checkpoint path and recovery epoch. + """ + available_checkpoints = find_all_recovery_checkpoints(path) + if available_checkpoints is None and is_running_in_azure_ml(): + logging.info("No recovery checkpoints available in the checkpoint folder. Trying to find checkpoints in " + "AzureML from previous runs of this job.") + # Download checkpoints from AzureML, then try to find recovery checkpoints among those. + temp_folder = download_checkpoints_to_temp_folder() + available_checkpoints = find_all_recovery_checkpoints(temp_folder) + if available_checkpoints is not None: + return extract_latest_checkpoint_and_epoch(available_checkpoints) + return None + + +def get_recovery_checkpoint_path(path: Path) -> Path: + """ + Returns the path to the last recovery checkpoint in the given folder or the provided filename. Raises a + FileNotFoundError if no + recovery checkpoint file is present. + :param path: Path to checkpoint folder + """ + recovery_ckpt_and_epoch = find_recovery_checkpoint_and_epoch(path) + if recovery_ckpt_and_epoch is not None: + return recovery_ckpt_and_epoch[0] + files = list(path.glob("*")) + raise FileNotFoundError(f"No checkpoint files found in {path}. Existing files: {' '.join(p.name for p in files)}") + + +def find_all_recovery_checkpoints(path: Path) -> Optional[List[Path]]: + """ + Extracts all file starting with RECOVERY_CHECKPOINT_FILE_NAME in path + :param path: + :return: + """ + all_recovery_files = [f for f in path.glob(RECOVERY_CHECKPOINT_FILE_NAME + "*")] + if len(all_recovery_files) == 0: + return None + return all_recovery_files + + +def extract_latest_checkpoint_and_epoch(available_files: List[Path]) -> PathAndEpoch: + """ + Checkpoints are saved as recovery_epoch={epoch}.ckpt, find the latest ckpt and epoch number. + :param available_files: all available checkpoints + :return: path the checkpoint from latest epoch and epoch number + """ + recovery_epochs = [int(re.findall(r"[\d]+", f.stem)[0]) for f in available_files] + idx_max_epoch = int(np.argmax(recovery_epochs)) + return available_files[idx_max_epoch], recovery_epochs[idx_max_epoch] + + +def create_best_checkpoint(path: Path) -> Path: + """ + Creates the best checkpoint file. "Best" is at the moment defined as being the last checkpoint, but could be + based on some defined policy. + The best checkpoint will be renamed to `best_checkpoint.ckpt`. + :param path: The folder that contains all checkpoint files. + """ + logging.debug(f"Files in checkpoint folder: {' '.join(p.name for p in path.glob('*'))}") + last_ckpt = path / LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX + all_files = f"Existing files: {' '.join(p.name for p in path.glob('*'))}" + if not last_ckpt.is_file(): + raise FileNotFoundError(f"Checkpoint file {LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX} not found. {all_files}") + logging.info(f"Using {LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX} as the best checkpoint: Renaming to " + f"{BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX}") + best = path / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX + last_ckpt.rename(best) + return best + + +def download_best_checkpoints_from_child_runs(config: OutputParams, run: Run) -> RunRecovery: + """ + Downloads the best checkpoints from all child runs of the provided Hyperdrive parent run. + The checkpoints for the sibling runs will go into folder 'OTHER_RUNS/' + in the checkpoint folder. There is special treatment for the child run that is equal to the present AzureML + run, its checkpoints will be read off the checkpoint folder as-is. + :param config: Model related configs. + :param run: The Hyperdrive parent run to download from. + :return: run recovery information + """ + child_runs: List[Run] = fetch_child_runs(run) + if not child_runs: + raise ValueError(f"AzureML run {run.id} does not have any child runs.") + logging.info(f"Run {run.id} has {len(child_runs)} child runs: {', '.join(c.id for c in child_runs)}") + tag_to_use = 'cross_validation_split_index' + can_use_split_indices = tag_values_all_distinct(child_runs, tag_to_use) + # download checkpoints for the child runs in the root of the parent + child_runs_checkpoints_roots: List[Path] = [] + for child in child_runs: + if child.id == RUN_CONTEXT.id: + # We expect to find the file(s) we need in config.checkpoint_folder + child_dst = config.checkpoint_folder + else: + subdir = str(child.tags[tag_to_use] if can_use_split_indices else child.number) + child_dst = config.checkpoint_folder / OTHER_RUNS_SUBDIR_NAME / subdir + download_run_output_file( + blob_path=Path(CHECKPOINT_FOLDER) / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, + destination=child_dst, + run=child + ) + child_runs_checkpoints_roots.append(child_dst) + return RunRecovery(checkpoints_roots=child_runs_checkpoints_roots) + + +def download_all_checkpoints_from_run(config: OutputParams, run: Run, + subfolder: Optional[str] = None, + only_return_path: bool = False) -> RunRecovery: + """ + Downloads all checkpoints of the provided run inside the checkpoints folder. + :param config: Model related configs. + :param run: Run whose checkpoints should be recovered + :param subfolder: optional subfolder name, if provided the checkpoints will be downloaded to + CHECKPOINT_FOLDER / subfolder. If None, the checkpoint are downloaded to CHECKPOINT_FOLDER of the current run. + :param: only_return_path: if True, return a RunRecovery object with the path to the checkpoint without actually + downloading the checkpoints. This is useful to avoid duplicating checkpoint download when running on multiple + nodes. If False, return the RunRecovery object and download the checkpoint to disk. + :return: run recovery information + """ + if fetch_child_runs(run): + raise ValueError(f"AzureML run {run.id} has child runs, this method does not support those.") + + destination_folder = config.checkpoint_folder / subfolder if subfolder else config.checkpoint_folder + + if not only_return_path: + download_run_outputs_by_prefix( + blobs_prefix=Path(CHECKPOINT_FOLDER), + destination=destination_folder, + run=run + ) + time.sleep(60) # Needed because AML is not fast enough to download + return RunRecovery(checkpoints_roots=[destination_folder]) diff --git a/InnerEye/ML/utils/run_recovery.py b/InnerEye/ML/utils/run_recovery.py index 620f05485..5c9809fdc 100644 --- a/InnerEye/ML/utils/run_recovery.py +++ b/InnerEye/ML/utils/run_recovery.py @@ -5,19 +5,12 @@ from __future__ import annotations import logging -import time from dataclasses import dataclass from pathlib import Path -from typing import List, Optional +from typing import List -from azureml.core import Run - -from InnerEye.Azure.azure_util import RUN_CONTEXT, download_run_output_file, download_run_outputs_by_prefix, \ - fetch_child_runs, tag_values_all_distinct -from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME, check_properties_are_not_none -from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, get_best_checkpoint_path, \ - get_recovery_checkpoint_path -from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER, OutputParams +from InnerEye.Common.common_util import check_properties_are_not_none +from InnerEye.ML.common import get_best_checkpoint_path @dataclass(frozen=True) @@ -27,70 +20,8 @@ class RunRecovery: """ checkpoints_roots: List[Path] - @staticmethod - def download_best_checkpoints_from_child_runs(config: OutputParams, run: Run) -> RunRecovery: - """ - Downloads the best checkpoints from all child runs of the provided Hyperdrive parent run. - The checkpoints for the sibling runs will go into folder 'OTHER_RUNS/' - in the checkpoint folder. There is special treatment for the child run that is equal to the present AzureML - run, its checkpoints will be read off the checkpoint folder as-is. - :param config: Model related configs. - :param run: The Hyperdrive parent run to download from. - :return: run recovery information - """ - child_runs: List[Run] = fetch_child_runs(run) - if not child_runs: - raise ValueError(f"AzureML run {run.id} does not have any child runs.") - logging.info(f"Run {run.id} has {len(child_runs)} child runs: {', '.join(c.id for c in child_runs)}") - tag_to_use = 'cross_validation_split_index' - can_use_split_indices = tag_values_all_distinct(child_runs, tag_to_use) - # download checkpoints for the child runs in the root of the parent - child_runs_checkpoints_roots: List[Path] = [] - for child in child_runs: - if child.id == RUN_CONTEXT.id: - # We expect to find the file(s) we need in config.checkpoint_folder - child_dst = config.checkpoint_folder - else: - subdir = str(child.tags[tag_to_use] if can_use_split_indices else child.number) - child_dst = config.checkpoint_folder / OTHER_RUNS_SUBDIR_NAME / subdir - download_run_output_file( - blob_path=Path(CHECKPOINT_FOLDER) / BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, - destination=child_dst, - run=child - ) - child_runs_checkpoints_roots.append(child_dst) - return RunRecovery(checkpoints_roots=child_runs_checkpoints_roots) - - @staticmethod - def download_all_checkpoints_from_run(config: OutputParams, run: Run, - subfolder: Optional[str] = None, - only_return_path: bool = False) -> RunRecovery: - """ - Downloads all checkpoints of the provided run inside the checkpoints folder. - :param config: Model related configs. - :param run: Run whose checkpoints should be recovered - :param subfolder: optional subfolder name, if provided the checkpoints will be downloaded to - CHECKPOINT_FOLDER / subfolder. If None, the checkpoint are downloaded to CHECKPOINT_FOLDER of the current run. - :param: only_return_path: if True, return a RunRecovery object with the path to the checkpoint without actually - downloading the checkpoints. This is useful to avoid duplicating checkpoint download when running on multiple - nodes. If False, return the RunRecovery object and download the checkpoint to disk. - :return: run recovery information - """ - if fetch_child_runs(run): - raise ValueError(f"AzureML run {run.id} has child runs, this method does not support those.") - - destination_folder = config.checkpoint_folder / subfolder if subfolder else config.checkpoint_folder - - if not only_return_path: - download_run_outputs_by_prefix( - blobs_prefix=Path(CHECKPOINT_FOLDER), - destination=destination_folder, - run=run - ) - time.sleep(60) # Needed because AML is not fast enough to download - return RunRecovery(checkpoints_roots=[destination_folder]) - def get_recovery_checkpoint_paths(self) -> List[Path]: + from InnerEye.ML.utils.checkpoint_handling import get_recovery_checkpoint_path return [get_recovery_checkpoint_path(x) for x in self.checkpoints_roots] def get_best_checkpoint_paths(self) -> List[Path]: diff --git a/InnerEye/Scripts/move_model.py b/InnerEye/Scripts/move_model.py index 7268f859b..24b69832e 100644 --- a/InnerEye/Scripts/move_model.py +++ b/InnerEye/Scripts/move_model.py @@ -10,7 +10,7 @@ from attr import dataclass from azureml.core import Environment, Model, Workspace -from InnerEye.ML.deep_learning_config import FINAL_MODEL_FOLDER, FINAL_ENSEMBLE_MODEL_FOLDER +from InnerEye.ML.common import FINAL_ENSEMBLE_MODEL_FOLDER, FINAL_MODEL_FOLDER PYTHON_ENVIRONMENT_NAME = "python_environment_name" MODEL_PATH = "MODEL" diff --git a/Tests/AfterTraining/test_after_training.py b/Tests/AfterTraining/test_after_training.py index ee914b381..48176b389 100644 --- a/Tests/AfterTraining/test_after_training.py +++ b/Tests/AfterTraining/test_after_training.py @@ -20,10 +20,10 @@ import pytest from azureml._restclient.constants import RunStatus from azureml.core import Model, Run -from health_azure.himl import RUN_RECOVERY_FILE from InnerEye.Azure.azure_config import AzureConfig -from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, download_run_output_file, download_run_outputs_by_prefix, \ +from InnerEye.Azure.azure_util import MODEL_ID_KEY_NAME, download_run_output_file, \ + download_run_outputs_by_prefix, \ get_comparison_baseline_paths, \ is_running_on_azure_agent, to_azure_friendly_string from InnerEye.Common import common_util, fixed_paths, fixed_paths_for_tests @@ -34,21 +34,25 @@ from InnerEye.Common.fixed_paths_for_tests import full_ml_test_data_path from InnerEye.Common.output_directories import OutputFolderForTests from InnerEye.Common.spawn_subprocess import spawn_and_monitor_subprocess -from InnerEye.ML.common import DATASET_CSV_FILE_NAME, ModelExecutionMode +from InnerEye.ML.common import (CHECKPOINT_FOLDER, DATASET_CSV_FILE_NAME, ModelExecutionMode, + RECOVERY_CHECKPOINT_FILE_NAME) from InnerEye.ML.configs.other.HelloContainer import HelloContainer from InnerEye.ML.configs.segmentation.BasicModel2Epochs import BasicModel2Epochs -from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER, ModelCategory +from InnerEye.ML.deep_learning_config import ModelCategory from InnerEye.ML.model_inference_config import read_model_inference_config from InnerEye.ML.model_testing import THUMBNAILS_FOLDER from InnerEye.ML.reports.notebook_report import get_html_report_name from InnerEye.ML.run_ml import MLRunner from InnerEye.ML.runner import main +from InnerEye.ML.utils.checkpoint_handling import download_checkpoints_to_temp_folder, \ + find_recovery_checkpoint_and_epoch from InnerEye.ML.utils.config_loader import ModelConfigLoader from InnerEye.ML.utils.image_util import get_unit_image_header from InnerEye.ML.utils.io_util import zip_random_dicom_series from InnerEye.ML.visualizers.plot_cross_validation import PlotCrossValidationConfig from InnerEye.Scripts import submit_for_inference from Tests.ML.util import assert_nifti_content, get_default_azure_config, get_default_workspace, get_nifti_shape +from health_azure.himl import RUN_RECOVERY_FILE FALLBACK_SINGLE_RUN = "refs_pull_606_merge:refs_pull_606_merge_1638867172_17ba8dc5" FALLBACK_ENSEMBLE_RUN = "refs_pull_606_merge:HD_b8a6ad93-8c19-45de-8ea1-f87fce92c3bd" @@ -215,6 +219,33 @@ def test_check_dataset_mountpoint(test_output_dirs: OutputFolderForTests) -> Non assert f"local_dataset : {expected_mountpoint}" in logs +@pytest.mark.after_training_single_run +def test_download_checkpoints_from_aml(test_output_dirs: OutputFolderForTests) -> None: + """ + Check that we can download checkpoint files from an AzureML run, if they are not available on disk. + """ + run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_SINGLE_RUN) + temp_folder = download_checkpoints_to_temp_folder(run, workspace=get_default_workspace()) + files = list(temp_folder.glob("*")) + assert len(files) == 2 + # Test if what's in the folder are really files, not directories + for file in files: + assert file.is_file() + # Now test if that is correctly integrated into the checkpoint finder. To avoid downloading a second time, + # now mock the call to the actual downloader. + with mock.patch("InnerEye.ML.utils.checkpoint_handling.is_running_in_azure_ml", return_value=True): + with mock.patch("InnerEye.ML.utils.checkpoint_handling.download_checkpoints_to_temp_folder", + return_value=temp_folder) as download: + # Call the checkpoint finder with a temp folder that does not contain any files, so it should try to download + result = find_recovery_checkpoint_and_epoch(test_output_dirs.root_dir) + download.assert_called_once_with() + assert result is not None + p, epoch = result + # The basic model only writes one checkpoint at epoch 1 + assert epoch == 1 + assert RECOVERY_CHECKPOINT_FILE_NAME in p.stem + + @pytest.mark.inference def test_submit_for_inference(test_output_dirs: OutputFolderForTests) -> None: """ diff --git a/Tests/ML/test_download_upload.py b/Tests/ML/test_download_upload.py index 4f7f67b42..5bb34bc97 100644 --- a/Tests/ML/test_download_upload.py +++ b/Tests/ML/test_download_upload.py @@ -12,7 +12,8 @@ from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME, logging_to_stdout from InnerEye.Common.output_directories import OutputFolderForTests from InnerEye.ML.model_config_base import ModelConfigBase -from InnerEye.ML.utils.run_recovery import RunRecovery +from InnerEye.ML.utils.checkpoint_handling import download_all_checkpoints_from_run, \ + download_best_checkpoints_from_child_runs from Tests.AfterTraining.test_after_training import FALLBACK_ENSEMBLE_RUN, FALLBACK_SINGLE_RUN, get_most_recent_run from Tests.ML.util import get_default_azure_config @@ -43,7 +44,7 @@ def test_download_recovery_single_run(test_output_dirs: OutputFolderForTests, config = ModelConfigBase(should_validate=False) config.set_output_to(output_dir) run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_SINGLE_RUN) - run_recovery = RunRecovery.download_all_checkpoints_from_run(config, run) + run_recovery = download_all_checkpoints_from_run(config, run) # This fails if there is no recovery checkpoint check_single_checkpoint(run_recovery.get_recovery_checkpoint_paths()) @@ -58,7 +59,7 @@ def test_download_best_checkpoints_ensemble_run(test_output_dirs: OutputFolderFo config.set_output_to(output_dir) run = get_most_recent_run(fallback_run_id_for_local_execution=FALLBACK_ENSEMBLE_RUN) - run_recovery = RunRecovery.download_best_checkpoints_from_child_runs(config, run) + run_recovery = download_best_checkpoints_from_child_runs(config, run) other_runs_folder = config.checkpoint_folder / OTHER_RUNS_SUBDIR_NAME assert other_runs_folder.is_dir() for child in ["0", "1"]: diff --git a/Tests/ML/test_lightning_containers.py b/Tests/ML/test_lightning_containers.py index 109901a97..64eda2d5c 100644 --- a/Tests/ML/test_lightning_containers.py +++ b/Tests/ML/test_lightning_containers.py @@ -16,8 +16,8 @@ from InnerEye.Azure.azure_config import AzureConfig from InnerEye.Common.output_directories import OutputFolderForTests -from InnerEye.ML.common import ModelExecutionMode -from InnerEye.ML.deep_learning_config import ARGS_TXT, DatasetParams, WorkflowParams +from InnerEye.ML.common import ARGS_TXT, ModelExecutionMode +from InnerEye.ML.deep_learning_config import DatasetParams, WorkflowParams from InnerEye.ML.lightning_base import InnerEyeContainer from InnerEye.ML.lightning_container import LightningContainer from InnerEye.ML.model_config_base import ModelConfigBase diff --git a/Tests/ML/test_regression_tests.py b/Tests/ML/test_regression_tests.py index 96dea3c4c..b939a0dde 100644 --- a/Tests/ML/test_regression_tests.py +++ b/Tests/ML/test_regression_tests.py @@ -15,7 +15,7 @@ from InnerEye.ML import baselines_util from InnerEye.ML.baselines_util import REGRESSION_TEST_AZUREML_FOLDER, REGRESSION_TEST_AZUREML_PARENT_FOLDER, \ REGRESSION_TEST_OUTPUT_FOLDER, compare_files, compare_folder_contents, compare_folders_and_run_outputs -from InnerEye.ML.deep_learning_config import FINAL_MODEL_FOLDER +from InnerEye.ML.common import FINAL_MODEL_FOLDER from InnerEye.ML.run_ml import MLRunner from Tests.AfterTraining.test_after_training import FALLBACK_ENSEMBLE_RUN, FALLBACK_SINGLE_RUN, get_most_recent_run from Tests.ML.configs.lightning_test_containers import DummyContainerWithModel diff --git a/Tests/ML/utils/test_checkpoint_handling.py b/Tests/ML/utils/test_checkpoint_handling.py index 5ea8d75bf..d44517027 100644 --- a/Tests/ML/utils/test_checkpoint_handling.py +++ b/Tests/ML/utils/test_checkpoint_handling.py @@ -12,10 +12,9 @@ from InnerEye.Common.common_util import OTHER_RUNS_SUBDIR_NAME from InnerEye.Common.fixed_paths import MODEL_INFERENCE_JSON_FILE_NAME -from InnerEye.ML.utils.checkpoint_handling import MODEL_WEIGHTS_DIR_NAME +from InnerEye.ML.utils.checkpoint_handling import MODEL_WEIGHTS_DIR_NAME, get_recovery_checkpoint_path from InnerEye.Common.output_directories import OutputFolderForTests -from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, get_recovery_checkpoint_path -from InnerEye.ML.deep_learning_config import FINAL_MODEL_FOLDER, FINAL_ENSEMBLE_MODEL_FOLDER +from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, FINAL_ENSEMBLE_MODEL_FOLDER, FINAL_MODEL_FOLDER from InnerEye.ML.model_config_base import ModelConfigBase from InnerEye.ML.model_inference_config import read_model_inference_config from InnerEye.ML.utils.checkpoint_handling import CheckpointHandler diff --git a/Tests/ML/utils/test_model_util.py b/Tests/ML/utils/test_model_util.py index 13a64db69..db692b9d4 100644 --- a/Tests/ML/utils/test_model_util.py +++ b/Tests/ML/utils/test_model_util.py @@ -12,8 +12,10 @@ from InnerEye.Common.output_directories import OutputFolderForTests from InnerEye.ML.common import BEST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, LAST_CHECKPOINT_FILE_NAME, \ - LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, RECOVERY_CHECKPOINT_FILE_NAME, create_best_checkpoint, \ - extract_latest_checkpoint_and_epoch, find_all_recovery_checkpoints, find_recovery_checkpoint_and_epoch + LAST_CHECKPOINT_FILE_NAME_WITH_SUFFIX, RECOVERY_CHECKPOINT_FILE_NAME +from InnerEye.ML.utils.checkpoint_handling import create_best_checkpoint, extract_latest_checkpoint_and_epoch, \ + find_all_recovery_checkpoints, \ + find_recovery_checkpoint_and_epoch from InnerEye.ML.config import SegmentationModelBase from InnerEye.ML.lightning_base import InnerEyeContainer from InnerEye.ML.lightning_helpers import load_from_checkpoint_and_adjust_for_inference diff --git a/TestsOutsidePackage/test_register_model.py b/TestsOutsidePackage/test_register_model.py index 6e79a8ff4..9a4b93cc8 100644 --- a/TestsOutsidePackage/test_register_model.py +++ b/TestsOutsidePackage/test_register_model.py @@ -11,7 +11,7 @@ from InnerEye.Common import fixed_paths from InnerEye.Common.output_directories import OutputFolderForTests from InnerEye.ML.config import SegmentationModelBase -from InnerEye.ML.deep_learning_config import CHECKPOINT_FOLDER +from InnerEye.ML.common import CHECKPOINT_FOLDER from InnerEye.ML.model_inference_config import ModelInferenceConfig from InnerEye.ML.run_ml import MLRunner diff --git a/hi-ml b/hi-ml index 334186321..38fd68557 160000 --- a/hi-ml +++ b/hi-ml @@ -1 +1 @@ -Subproject commit 334186321f6989033f5609880781ba4c299f6f67 +Subproject commit 38fd685579749e6c5e5f8c199c76c5854394421c