From 57e5c69a03542f942e622dddd00a2fc5fa4dcb5e Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Tue, 28 Jul 2020 16:42:42 -0400 Subject: [PATCH 01/12] Initial diff upload utility function --- _delphi_utils_python/delphi_utils/export.py | 92 ++++++++++++++++++++- 1 file changed, 91 insertions(+), 1 deletion(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index afc6299dd..bcc469646 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,6 +1,13 @@ # -*- coding: utf-8 -*- from datetime import datetime +import filecmp +from os import listdir +from os.path import join, abspath +import shutil +from typing import List +import csvdiff +from git import Repo import pandas as pd @@ -34,6 +41,89 @@ def create_export_csv( ).sort_values() for date in dates: export_fn = f'{date.strftime("%Y%m%d")}_{geo_res}_' f"{metric}_{sensor}.csv" + export_file = join(export_dir, export_fn) df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]].to_csv( - f"{export_dir}/{export_fn}", index=False, na_rep="NA" + export_file, index=False, na_rep="NA" ) + +def filter_new_issues( + export_dir: str, + cache_dir: str, + ) -> bool: + + repo = Repo(".", search_parent_directories=True) + # Abs paths of all modified files to check if we will override uncommitted changes + dirty_files = [join(repo.working_tree_dir, f) for f in repo.untracked_files] + dirty_files += [join(repo.working_tree_dir, d.a_path) for d in repo.index.diff(None)] + + exported_files = set(listdir(export_dir)) + previous_files = set(listdir(cache_dir)) + + # TODO: Deal with deleted_files (previous - exported) + + # New files + for filename in exported_files - previous_files: + before_file = join(cache_dir, filename) + after_file = join(export_dir, filename) + + # Archive all, as all new files are new issues too + assert before_file not in dirty_files + shutil.copyfile(after_file, before_file) + + # Stage + repo.index.add(abspath(before_file)) + + # Common files + dirty_conflict = False + for filename in exported_files & previous_files: + before_file = join(cache_dir, filename) + after_file = join(export_dir, filename) + diffs_file = join(export_dir, filename + ".diff") + + # Check for simple file similarity before doing CSV diffs + if filecmp.cmp(before_file, after_file, shallow=False): + continue + + after_df = pd.read_csv( + after_file, + dtype={"geo_id": str, "val": float, "se": float, "sample_size": float}) + after_df.set_index("geo_id", inplace=True) + + diff = csvdiff.diff_files( + before_file, after_file, + index_columns=["geo_id"]) + + added_keys = [added["geo_id"] for added in diff["added"]] + changed_keys = [changed["key"] for changed in diff["changed"]] + if len(diff["removed"]) > 0: + raise NotImplementedError("Cannot handle deletions yet") + + # Write new issues only + new_issues_df = after_df.loc[added_keys + changed_keys, :] + + # If archiving overwrites uncommitted changes, + # skip archiving and write new issues to diffs_file instead + if abspath(before_file) in dirty_files: + print(f"Warning, want to archive '{after_file}' as '{before_file}' but latter has uncommitted changes. Skipping archiving...") + dirty_conflict = True + new_issues_df.to_csv(diffs_file, index=False, na_rep="NA") + + # Otherwise, archive and explicitly stage new export, then replace with just new issues + else: + # Archive + shutil.copyfile(after_file, before_file) + + # Stage + repo.index.add(abspath(before_file)) + + # Replace + new_issues_df.to_csv(after_file, index=False, na_rep="NA") + + if not dirty_conflict: + repo.index.commit(message="Automated archive") + return True + + print( + "Some files were not archived to prevent overwritting uncommitted changes.\n" + f"Look for *.csv.diff files in {export_dir} and manually resolve / archive affected files.") + return False From cee1cbb053dee1de00a8700cd5afe6544d6449a7 Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Wed, 29 Jul 2020 22:38:29 -0400 Subject: [PATCH 02/12] Split up functions into diff, archive, replace --- _delphi_utils_python/delphi_utils/__init__.py | 8 +- _delphi_utils_python/delphi_utils/export.py | 164 +++++++++++------- 2 files changed, 109 insertions(+), 63 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 746a5bc6d..a3accc0e9 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -4,7 +4,13 @@ from __future__ import absolute_import -from .export import create_export_csv +from .export import ( + create_export_csv, + diff_exports, + archive_exports, + remove_identical_exports, + replace_exports, +) from .utils import read_params __version__ = "0.1.0" diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index bcc469646..74318b6ec 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,10 +1,11 @@ # -*- coding: utf-8 -*- from datetime import datetime import filecmp -from os import listdir -from os.path import join, abspath +from glob import glob +from os import replace, remove +from os.path import join, abspath, basename import shutil -from typing import List +from typing import Tuple, List, Dict, Optional import csvdiff from git import Repo @@ -46,84 +47,123 @@ def create_export_csv( export_file, index=False, na_rep="NA" ) -def filter_new_issues( +def diff_exports( + archive_dir: str, export_dir: str, - cache_dir: str, - ) -> bool: + csv_index_col: str = "geo_id", + deleted_indices_ok: bool = False, +) -> Tuple[List[str], Dict[str, Optional[str]], List[str]]: - repo = Repo(".", search_parent_directories=True) - # Abs paths of all modified files to check if we will override uncommitted changes - dirty_files = [join(repo.working_tree_dir, f) for f in repo.untracked_files] - dirty_files += [join(repo.working_tree_dir, d.a_path) for d in repo.index.diff(None)] - - exported_files = set(listdir(export_dir)) - previous_files = set(listdir(cache_dir)) - - # TODO: Deal with deleted_files (previous - exported) - - # New files - for filename in exported_files - previous_files: - before_file = join(cache_dir, filename) - after_file = join(export_dir, filename) + # Glob to only pick out CSV files, ignore hidden files + previous_files = set(basename(f) for f in glob(join(archive_dir, "*.csv"))) + exported_files = set(basename(f) for f in glob(join(export_dir, "*.csv"))) - # Archive all, as all new files are new issues too - assert before_file not in dirty_files - shutil.copyfile(after_file, before_file) + deleted_files = sorted(join(archive_dir, f) for f in previous_files - exported_files) + common_filenames = sorted(exported_files & previous_files) + new_files = sorted(join(export_dir, f) for f in exported_files - previous_files) - # Stage - repo.index.add(abspath(before_file)) - - # Common files - dirty_conflict = False - for filename in exported_files & previous_files: - before_file = join(cache_dir, filename) + common_files_to_diffs = {} + for filename in common_filenames: + before_file = join(archive_dir, filename) after_file = join(export_dir, filename) - diffs_file = join(export_dir, filename + ".diff") # Check for simple file similarity before doing CSV diffs if filecmp.cmp(before_file, after_file, shallow=False): + common_files_to_diffs[after_file] = None continue - after_df = pd.read_csv( - after_file, - dtype={"geo_id": str, "val": float, "se": float, "sample_size": float}) - after_df.set_index("geo_id", inplace=True) - + # TODO: Realize we only need row-level diffs, dont need CSV cell diffs diff = csvdiff.diff_files( before_file, after_file, - index_columns=["geo_id"]) + index_columns=[csv_index_col]) - added_keys = [added["geo_id"] for added in diff["added"]] - changed_keys = [changed["key"] for changed in diff["changed"]] - if len(diff["removed"]) > 0: - raise NotImplementedError("Cannot handle deletions yet") + added_indices = [added[csv_index_col] for added in diff["added"]] + changed_indices = [changed["key"][0] for changed in diff["changed"]] + if len(diff["removed"]) > 0 and not deleted_indices_ok: + raise NotImplementedError("Have not determined how to represent deletions") - # Write new issues only - new_issues_df = after_df.loc[added_keys + changed_keys, :] + # Write the diffs to diff_file, if applicable + if len(added_indices + changed_indices) > 0: - # If archiving overwrites uncommitted changes, - # skip archiving and write new issues to diffs_file instead - if abspath(before_file) in dirty_files: - print(f"Warning, want to archive '{after_file}' as '{before_file}' but latter has uncommitted changes. Skipping archiving...") - dirty_conflict = True - new_issues_df.to_csv(diffs_file, index=False, na_rep="NA") + # Only load up the CSV in pandas if there are indices to access + after_df = pd.read_csv( + after_file, + dtype={"geo_id": str, "val": float, "se": float, "sample_size": float}) + after_df.set_index(csv_index_col, inplace=True) + new_issues_df = after_df.loc[added_indices + changed_indices, :] + + diff_file = join(export_dir, filename + ".diff") + new_issues_df.to_csv(diff_file, na_rep="NA") + common_files_to_diffs[after_file] = diff_file - # Otherwise, archive and explicitly stage new export, then replace with just new issues else: + common_files_to_diffs[after_file] = None + + return deleted_files, common_files_to_diffs, new_files + +def archive_exports( + exported_files: List[str], + archive_dir: str, + override_uncommitted: bool = False, + auto_commit: bool = True, + commit_partial_success: bool = False, + commit_message: str = "Automated archive", +) -> Tuple[List[str], List[str]]: + + repo = Repo(".", search_parent_directories=True) + + + # Abs paths of all modified files to check if we will override uncommitted changes + dirty_files = [join(repo.working_tree_dir, f) for f in repo.untracked_files] + dirty_files += [join(repo.working_tree_dir, d.a_path) for d in repo.index.diff(None)] + + archived_files = [] + archive_success = [] + archive_fail = [] + for exported_file in exported_files: + archive_file = abspath(join(archive_dir, basename(exported_file))) + + # Archive and explicitly stage new export, depending if override + if archive_file not in dirty_files or override_uncommitted: # Archive - shutil.copyfile(after_file, before_file) + shutil.copyfile(exported_file, archive_file) - # Stage - repo.index.add(abspath(before_file)) + archived_files.append(archive_file) + archive_success.append(exported_file) + + # Otherwise ignore the archiving for this file + else: + archive_fail.append(exported_file) - # Replace - new_issues_df.to_csv(after_file, index=False, na_rep="NA") + # Stage + repo.index.add(archived_files) - if not dirty_conflict: - repo.index.commit(message="Automated archive") - return True + # Commit staged files + if auto_commit and len(exported_files) > 0: + + # Support partial success and at least one archive succeeded + partial_success = commit_partial_success and len(archive_success) > 0 + + if len(archive_success) == len(exported_files) or partial_success: + repo.index.commit(message=commit_message) + + return archive_success, archive_fail + +def remove_identical_exports( + common_files_to_diffs: Dict[str, Optional[str]] +): + for common_file, diff_file in common_files_to_diffs.items(): + if diff_file is None: + remove(common_file) + +def replace_exports( + exported_files: List[str], + common_files_to_diffs: Dict[str, Optional[str]], +): + for exported_file in exported_files: + # If exported_file is not a key, then it was not a common file. + # So no replacing would be needed anyway + diff_file = common_files_to_diffs.get(exported_file, None) - print( - "Some files were not archived to prevent overwritting uncommitted changes.\n" - f"Look for *.csv.diff files in {export_dir} and manually resolve / archive affected files.") - return False + if diff_file is not None: + replace(diff_file, exported_file) From 15e63f4687a32d9ddda82bc151d2bfa8b8486102 Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Thu, 30 Jul 2020 16:10:24 -0400 Subject: [PATCH 03/12] Create commits in a specified branch instead of active branch --- _delphi_utils_python/delphi_utils/__init__.py | 1 + _delphi_utils_python/delphi_utils/export.py | 93 +++++++++++++------ 2 files changed, 68 insertions(+), 26 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index a3accc0e9..53ddf5d07 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -6,6 +6,7 @@ from .export import ( create_export_csv, + archive_branch, diff_exports, archive_exports, remove_identical_exports, diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 74318b6ec..a93f453dc 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +from contextlib import contextmanager from datetime import datetime import filecmp from glob import glob @@ -7,10 +8,11 @@ import shutil from typing import Tuple, List, Dict, Optional -import csvdiff from git import Repo import pandas as pd +Files = List[str] +FileDiffMap = Dict[str, Optional[str]] def create_export_csv( df: pd.DataFrame, @@ -47,12 +49,64 @@ def create_export_csv( export_file, index=False, na_rep="NA" ) +@contextmanager +def archive_branch(branch_name: Optional[str]): + repo = Repo(".", search_parent_directories=True) + + # Set branch to an actual Head object + orig_branch = repo.active_branch + if branch_name is None: + branch = repo.active_branch + elif branch_name in repo.branches: + branch = repo.branches[branch_name] + else: + branch = repo.create_head(branch_name) + + # Checkout target archive branch for all operations in block + branch.checkout() + + try: + yield branch + + finally: + # Once done, checkout original branch + orig_branch.checkout() + +def diff_export_csv( + before_csv: str, + after_csv: str +) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + + export_csv_dtypes = {"geo_id": str, "val": float, "se": float, "sample_size": float} + + before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) + before_df.set_index("geo_id", inplace=True) + + after_df = pd.read_csv(after_csv, dtype=export_csv_dtypes) + after_df.set_index("geo_id", inplace=True) + + deleted_idx = before_df.index.difference(after_df.index) + common_idx = before_df.index.intersection(after_df.index) + added_idx = after_df.index.difference(before_df.index) + + before_df_cmn = before_df.reindex(common_idx) + after_df_cmn = after_df.reindex(common_idx) + + # Comparison treating NA == NA as True + # TODO: Should we change exact equality to some approximate one? + same_mask = before_df_cmn == after_df_cmn + same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + + return ( + before_df.loc[deleted_idx, :], + after_df_cmn.loc[~(same_mask.all(axis=1)), :], + after_df.loc[added_idx, :]) + def diff_exports( archive_dir: str, export_dir: str, - csv_index_col: str = "geo_id", deleted_indices_ok: bool = False, -) -> Tuple[List[str], Dict[str, Optional[str]], List[str]]: +) -> Tuple[Files, FileDiffMap, Files]: # Glob to only pick out CSV files, ignore hidden files previous_files = set(basename(f) for f in glob(join(archive_dir, "*.csv"))) @@ -72,26 +126,14 @@ def diff_exports( common_files_to_diffs[after_file] = None continue - # TODO: Realize we only need row-level diffs, dont need CSV cell diffs - diff = csvdiff.diff_files( - before_file, after_file, - index_columns=[csv_index_col]) + deleted_df, changed_df, added_df = diff_export_csv(before_file, after_file) + new_issues_df = pd.concat([changed_df, added_df], axis=0) - added_indices = [added[csv_index_col] for added in diff["added"]] - changed_indices = [changed["key"][0] for changed in diff["changed"]] - if len(diff["removed"]) > 0 and not deleted_indices_ok: - raise NotImplementedError("Have not determined how to represent deletions") + if not deleted_indices_ok and len(deleted_df) > 0: + raise NotImplementedError("Cannot handle deletions yet") # Write the diffs to diff_file, if applicable - if len(added_indices + changed_indices) > 0: - - # Only load up the CSV in pandas if there are indices to access - after_df = pd.read_csv( - after_file, - dtype={"geo_id": str, "val": float, "se": float, "sample_size": float}) - after_df.set_index(csv_index_col, inplace=True) - new_issues_df = after_df.loc[added_indices + changed_indices, :] - + if len(new_issues_df) > 0: diff_file = join(export_dir, filename + ".diff") new_issues_df.to_csv(diff_file, na_rep="NA") common_files_to_diffs[after_file] = diff_file @@ -102,17 +144,16 @@ def diff_exports( return deleted_files, common_files_to_diffs, new_files def archive_exports( - exported_files: List[str], + exported_files: Files, archive_dir: str, override_uncommitted: bool = False, auto_commit: bool = True, commit_partial_success: bool = False, commit_message: str = "Automated archive", -) -> Tuple[List[str], List[str]]: +) -> Tuple[Files, Files]: repo = Repo(".", search_parent_directories=True) - # Abs paths of all modified files to check if we will override uncommitted changes dirty_files = [join(repo.working_tree_dir, f) for f in repo.untracked_files] dirty_files += [join(repo.working_tree_dir, d.a_path) for d in repo.index.diff(None)] @@ -150,15 +191,15 @@ def archive_exports( return archive_success, archive_fail def remove_identical_exports( - common_files_to_diffs: Dict[str, Optional[str]] + common_files_to_diffs: FileDiffMap ): for common_file, diff_file in common_files_to_diffs.items(): if diff_file is None: remove(common_file) def replace_exports( - exported_files: List[str], - common_files_to_diffs: Dict[str, Optional[str]], + exported_files: Files, + common_files_to_diffs: FileDiffMap, ): for exported_file in exported_files: # If exported_file is not a key, then it was not a common file. From fefc1fd91d62f22aebd6533211cdd9dbe5c1cc18 Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Fri, 31 Jul 2020 15:44:05 -0400 Subject: [PATCH 04/12] Abstracted out archiving --- _delphi_utils_python/delphi_utils/__init__.py | 10 +- _delphi_utils_python/delphi_utils/archive.py | 210 ++++++++++++++++++ _delphi_utils_python/delphi_utils/export.py | 172 +------------- jhu/delphi_jhu/run.py | 23 +- 4 files changed, 235 insertions(+), 180 deletions(-) create mode 100644 _delphi_utils_python/delphi_utils/archive.py diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 53ddf5d07..1b1eef302 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -4,14 +4,8 @@ from __future__ import absolute_import -from .export import ( - create_export_csv, - archive_branch, - diff_exports, - archive_exports, - remove_identical_exports, - replace_exports, -) +from .export import create_export_csv +from .archive import GitArchiveDiffer from .utils import read_params __version__ = "0.1.0" diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py new file mode 100644 index 000000000..3bfecb245 --- /dev/null +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -0,0 +1,210 @@ +from contextlib import contextmanager +import filecmp +from glob import glob +from os import remove, replace +from os.path import join, basename, abspath +import shutil +from typing import Tuple, List, Dict, Optional + +from git import Repo +import pandas as pd + +Files = List[str] +FileDiffMap = Dict[str, Optional[str]] + +def diff_export_csv( + before_csv: str, + after_csv: str +) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + + export_csv_dtypes = {"geo_id": str, "val": float, "se": float, "sample_size": float} + + before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) + before_df.set_index("geo_id", inplace=True) + + after_df = pd.read_csv(after_csv, dtype=export_csv_dtypes) + after_df.set_index("geo_id", inplace=True) + + deleted_idx = before_df.index.difference(after_df.index) + common_idx = before_df.index.intersection(after_df.index) + added_idx = after_df.index.difference(before_df.index) + + before_df_cmn = before_df.reindex(common_idx) + after_df_cmn = after_df.reindex(common_idx) + + # Comparison treating NA == NA as True + # TODO: Should we change exact equality to some approximate one? + same_mask = before_df_cmn == after_df_cmn + same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + + return ( + before_df.loc[deleted_idx, :], + after_df_cmn.loc[~(same_mask.all(axis=1)), :], + after_df.loc[added_idx, :]) + +class ArchiveDiffer: + + def __init__(self, cache_dir: str, export_dir: str): + self.cache_dir = cache_dir + self.export_dir = export_dir + + self._cache_updated = False + self._exports_archived = False + + def update_cache(self): + # Depends on the archiving backend + raise NotImplementedError + + def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: + # Main logic of finding diffs across and within CSV files + # Should be called after update_cache + assert self._cache_updated + + # Glob to only pick out CSV files, ignore hidden files + previous_files = set(basename(f) for f in glob(join(self.cache_dir, "*.csv"))) + exported_files = set(basename(f) for f in glob(join(self.export_dir, "*.csv"))) + + deleted_files = sorted(join(self.cache_dir, f) for f in previous_files - exported_files) + common_filenames = sorted(exported_files & previous_files) + new_files = sorted(join(self.export_dir, f) for f in exported_files - previous_files) + + common_diffs = {} + for filename in common_filenames: + before_file = join(self.cache_dir, filename) + after_file = join(self.export_dir, filename) + + # Check for simple file similarity before doing CSV diffs + if filecmp.cmp(before_file, after_file, shallow=False): + common_diffs[after_file] = None + continue + + deleted_df, changed_df, added_df = diff_export_csv(before_file, after_file) + new_issues_df = pd.concat([changed_df, added_df], axis=0) + + if len(deleted_df) > 0: + raise NotImplementedError("Cannot handle deletions yet") + + # Write the diffs to diff_file, if applicable + if len(new_issues_df) > 0: + diff_file = join(self.export_dir, filename + ".diff") + + new_issues_df.to_csv(diff_file, na_rep="NA") + common_diffs[after_file] = diff_file + + else: + common_diffs[after_file] = None + + return deleted_files, common_diffs, new_files + + def archive_exports(self, exported_files: Optional[Files]) -> Tuple[Files, Files]: + raise NotImplementedError + + def filter_exports(self, common_diffs: FileDiffMap): + # Should be called after archive_exports + assert self._exports_archived + + for exported_file, diff_file in common_diffs.items(): + # Delete existing exports that had no data diff + if diff_file is None: + remove(exported_file) + + # Replace exports where diff file was generated + else: + replace(diff_file, exported_file) + +class GitArchiveDiffer(ArchiveDiffer): + + def __init__( + self, cache_dir: str, export_dir: str, + branch_name: Optional[str] = None, + override_dirty: bool = False, + commit_partial_success: bool = False, + commit_message: str = "Automated archive", + ): + super().__init__(cache_dir, export_dir) + + assert override_dirty or not commit_partial_success, \ + "Only can commit_partial_success=True when override_dirty=True" + + # TODO: Handle if no repository is found + self.repo = Repo(cache_dir, search_parent_directories=True) + + self.orig_branch = self.repo.active_branch + self.branch = self.get_branch(branch_name) + self.override_dirty = override_dirty + self.commit_partial_success = commit_partial_success + self.commit_message = commit_message + + def get_branch(self, branch_name: Optional[str] = None): + if branch_name is None: + return self.repo.active_branch + + if branch_name in self.repo.branches: + return self.repo.branches[branch_name] + + return self.repo.create_head(branch_name) + + @contextmanager + def archiving_branch(self): + self.branch.checkout() + + try: + yield self.branch + finally: + self.orig_branch.checkout() + + def update_cache(self): + # Make sure cache directory is clean: has everything nicely committed + if not self.override_dirty: + cache_clean = not self.repo.is_dirty(untracked_files=True, path=abspath(self.cache_dir)) + assert cache_clean, f"There are uncommitted changes in the cache dir '{self.cache_dir}'" + + self._cache_updated = True + + def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: + with self.archiving_branch(): + return super().diff_exports() + + def archive_exports(self, exported_files: Optional[Files] = None) -> Tuple[Files, Files]: + archived_files = [] + archive_success = [] + archive_fail = [] + + if exported_files is None: + exported_files = glob(join(self.export_dir, "*.csv")) + + with self.archiving_branch(): + # Abs paths of all modified files to check if we will override uncommitted changes + working_tree_dir = self.repo.working_tree_dir + dirty_files = [join(working_tree_dir, f) for f in self.repo.untracked_files] + dirty_files += [join(working_tree_dir, d.a_path) for d in self.repo.index.diff(None)] + + for exported_file in exported_files: + archive_file = abspath(join(self.cache_dir, basename(exported_file))) + + # Archive and explicitly stage new export, depending if override + if self.override_dirty or archive_file not in dirty_files: + # Archive + shutil.copyfile(exported_file, archive_file) + + archived_files.append(archive_file) + archive_success.append(exported_file) + + # Otherwise ignore the archiving for this file + else: + archive_fail.append(exported_file) + + # Stage + self.repo.index.add(archived_files) + + # Commit staged files + if len(exported_files) > 0: + + # Support partial success and at least one archive succeeded + partial_success = self.commit_partial_success and len(archive_success) > 0 + + if len(archive_success) == len(exported_files) or partial_success: + self.repo.index.commit(message=self.commit_message) + self._exports_archived = True + + return archive_success, archive_fail diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index a93f453dc..47f4a7604 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,19 +1,9 @@ # -*- coding: utf-8 -*- -from contextlib import contextmanager from datetime import datetime -import filecmp -from glob import glob -from os import replace, remove -from os.path import join, abspath, basename -import shutil -from typing import Tuple, List, Dict, Optional +from os.path import join -from git import Repo import pandas as pd -Files = List[str] -FileDiffMap = Dict[str, Optional[str]] - def create_export_csv( df: pd.DataFrame, start_date: datetime, @@ -48,163 +38,3 @@ def create_export_csv( df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]].to_csv( export_file, index=False, na_rep="NA" ) - -@contextmanager -def archive_branch(branch_name: Optional[str]): - repo = Repo(".", search_parent_directories=True) - - # Set branch to an actual Head object - orig_branch = repo.active_branch - if branch_name is None: - branch = repo.active_branch - elif branch_name in repo.branches: - branch = repo.branches[branch_name] - else: - branch = repo.create_head(branch_name) - - # Checkout target archive branch for all operations in block - branch.checkout() - - try: - yield branch - - finally: - # Once done, checkout original branch - orig_branch.checkout() - -def diff_export_csv( - before_csv: str, - after_csv: str -) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: - - export_csv_dtypes = {"geo_id": str, "val": float, "se": float, "sample_size": float} - - before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) - before_df.set_index("geo_id", inplace=True) - - after_df = pd.read_csv(after_csv, dtype=export_csv_dtypes) - after_df.set_index("geo_id", inplace=True) - - deleted_idx = before_df.index.difference(after_df.index) - common_idx = before_df.index.intersection(after_df.index) - added_idx = after_df.index.difference(before_df.index) - - before_df_cmn = before_df.reindex(common_idx) - after_df_cmn = after_df.reindex(common_idx) - - # Comparison treating NA == NA as True - # TODO: Should we change exact equality to some approximate one? - same_mask = before_df_cmn == after_df_cmn - same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) - - return ( - before_df.loc[deleted_idx, :], - after_df_cmn.loc[~(same_mask.all(axis=1)), :], - after_df.loc[added_idx, :]) - -def diff_exports( - archive_dir: str, - export_dir: str, - deleted_indices_ok: bool = False, -) -> Tuple[Files, FileDiffMap, Files]: - - # Glob to only pick out CSV files, ignore hidden files - previous_files = set(basename(f) for f in glob(join(archive_dir, "*.csv"))) - exported_files = set(basename(f) for f in glob(join(export_dir, "*.csv"))) - - deleted_files = sorted(join(archive_dir, f) for f in previous_files - exported_files) - common_filenames = sorted(exported_files & previous_files) - new_files = sorted(join(export_dir, f) for f in exported_files - previous_files) - - common_files_to_diffs = {} - for filename in common_filenames: - before_file = join(archive_dir, filename) - after_file = join(export_dir, filename) - - # Check for simple file similarity before doing CSV diffs - if filecmp.cmp(before_file, after_file, shallow=False): - common_files_to_diffs[after_file] = None - continue - - deleted_df, changed_df, added_df = diff_export_csv(before_file, after_file) - new_issues_df = pd.concat([changed_df, added_df], axis=0) - - if not deleted_indices_ok and len(deleted_df) > 0: - raise NotImplementedError("Cannot handle deletions yet") - - # Write the diffs to diff_file, if applicable - if len(new_issues_df) > 0: - diff_file = join(export_dir, filename + ".diff") - new_issues_df.to_csv(diff_file, na_rep="NA") - common_files_to_diffs[after_file] = diff_file - - else: - common_files_to_diffs[after_file] = None - - return deleted_files, common_files_to_diffs, new_files - -def archive_exports( - exported_files: Files, - archive_dir: str, - override_uncommitted: bool = False, - auto_commit: bool = True, - commit_partial_success: bool = False, - commit_message: str = "Automated archive", -) -> Tuple[Files, Files]: - - repo = Repo(".", search_parent_directories=True) - - # Abs paths of all modified files to check if we will override uncommitted changes - dirty_files = [join(repo.working_tree_dir, f) for f in repo.untracked_files] - dirty_files += [join(repo.working_tree_dir, d.a_path) for d in repo.index.diff(None)] - - archived_files = [] - archive_success = [] - archive_fail = [] - for exported_file in exported_files: - archive_file = abspath(join(archive_dir, basename(exported_file))) - - # Archive and explicitly stage new export, depending if override - if archive_file not in dirty_files or override_uncommitted: - # Archive - shutil.copyfile(exported_file, archive_file) - - archived_files.append(archive_file) - archive_success.append(exported_file) - - # Otherwise ignore the archiving for this file - else: - archive_fail.append(exported_file) - - # Stage - repo.index.add(archived_files) - - # Commit staged files - if auto_commit and len(exported_files) > 0: - - # Support partial success and at least one archive succeeded - partial_success = commit_partial_success and len(archive_success) > 0 - - if len(archive_success) == len(exported_files) or partial_success: - repo.index.commit(message=commit_message) - - return archive_success, archive_fail - -def remove_identical_exports( - common_files_to_diffs: FileDiffMap -): - for common_file, diff_file in common_files_to_diffs.items(): - if diff_file is None: - remove(common_file) - -def replace_exports( - exported_files: Files, - common_files_to_diffs: FileDiffMap, -): - for exported_file in exported_files: - # If exported_file is not a key, then it was not a common file. - # So no replacing would be needed anyway - diff_file = common_files_to_diffs.get(exported_file, None) - - if diff_file is not None: - replace(diff_file, exported_file) diff --git a/jhu/delphi_jhu/run.py b/jhu/delphi_jhu/run.py index 6758b0d4f..65da0054e 100644 --- a/jhu/delphi_jhu/run.py +++ b/jhu/delphi_jhu/run.py @@ -11,7 +11,11 @@ import numpy as np import pandas as pd -from delphi_utils import read_params, create_export_csv +from delphi_utils import ( + read_params, + create_export_csv, + GitArchiveDiffer, +) from .geo import geo_map from .pull import pull_jhu_data @@ -62,6 +66,11 @@ def run_module(): export_dir = params["export_dir"] base_url = params["base_url"] static_file_dir = params["static_file_dir"] + cache_dir = params["cache_dir"] + arch_diff = GitArchiveDiffer( + cache_dir, export_dir, + branch_name="jhu-receiving-archive") + arch_diff.update_cache() map_df = pd.read_csv( join(static_file_dir, "fips_prop_pop.csv"), dtype={"fips": int} @@ -95,3 +104,15 @@ def run_module(): geo_res=geo_res, sensor=sensor_name, ) + + # Exports + _, common_diffs, _ = arch_diff.diff_exports() + successes, fails = arch_diff.archive_exports() + succ_common_diffs = { + exported_file: diff_file + for exported_file, diff_file in common_diffs.items() + if exported_file in successes} + arch_diff.filter_exports(succ_common_diffs) + + for exported_file in fails: + print(f"Failed to archive '{exported_file}'") From 7c1aecc85e99c019718c19700cc22bd01af30b07 Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Mon, 3 Aug 2020 19:26:34 -0400 Subject: [PATCH 05/12] Support for S3 bucket archiving --- _delphi_utils_python/delphi_utils/__init__.py | 2 +- _delphi_utils_python/delphi_utils/archive.py | 64 ++++++++++++++++++- _delphi_utils_python/setup.py | 2 + 3 files changed, 66 insertions(+), 2 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 1b1eef302..afd8562db 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -5,7 +5,7 @@ from __future__ import absolute_import from .export import create_export_csv -from .archive import GitArchiveDiffer +from .archive import GitArchiveDiffer, S3ArchiveDiffer from .utils import read_params __version__ = "0.1.0" diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 3bfecb245..edd29ce2a 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -6,6 +6,7 @@ import shutil from typing import Tuple, List, Dict, Optional +from boto3 import Session from git import Repo import pandas as pd @@ -112,6 +113,66 @@ def filter_exports(self, common_diffs: FileDiffMap): else: replace(diff_file, exported_file) +class S3ArchiveDiffer(ArchiveDiffer): + + def __init__( + self, cache_dir: str, export_dir: str, + bucket_name: str, + indicator_prefix: str, + s3_credentials: Dict[str, str], + ): + super().__init__(cache_dir, export_dir) + self.s3 = Session(**s3_credentials).resource("s3") + self.bucket = self.s3.Bucket(bucket_name) + self.bucket_versioning = self.s3.BucketVersioning(bucket_name) + self.indicator_prefix = indicator_prefix + + def update_cache(self): + # List all indicator-related objects from S3 + archive_objects = self.bucket.objects.filter(Prefix=self.indicator_prefix).all() + archive_objects = [obj for obj in archive_objects if obj.key.endswith(".csv")] + + # Check against what we have locally and download missing ones + cached_files = set(basename(f) for f in glob(join(self.cache_dir, "*.csv"))) + for obj in archive_objects: + archive_file = basename(obj.key) + cached_file = join(self.cache_dir, archive_file) + + if archive_file not in cached_files: + print(f"Updating cache with {cached_file}") + obj.Object().download_file(cached_file) + + self._cache_updated = True + + def archive_exports(self, exported_files: Optional[Files] = None) -> Tuple[Files, Files]: + archive_success = [] + archive_fail = [] + + if exported_files is None: + exported_files = glob(join(self.export_dir, "*.csv")) + + # Enable versioning if turned off + # if self.bucket_versioning.status != "Enabled": + # self.bucket_versioning.enable() + + for exported_file in exported_files: + cached_file = abspath(join(self.cache_dir, basename(exported_file))) + archive_key = join(self.indicator_prefix, basename(exported_file)) + + # Update local cache + shutil.copyfile(exported_file, cached_file) + + try: + self.bucket.Object(archive_key).upload_file(exported_file) + # TODO: Wait until exists to confirm successful upload? + archive_success.append(exported_file) + except: + archive_fail.append(exported_file) + + self._exports_archived = True + + return archive_success, archive_fail + class GitArchiveDiffer(ArchiveDiffer): def __init__( @@ -205,6 +266,7 @@ def archive_exports(self, exported_files: Optional[Files] = None) -> Tuple[Files if len(archive_success) == len(exported_files) or partial_success: self.repo.index.commit(message=self.commit_message) - self._exports_archived = True + + self._exports_archived = True return archive_success, archive_fail diff --git a/_delphi_utils_python/setup.py b/_delphi_utils_python/setup.py index 8a4990593..ea21c6be0 100644 --- a/_delphi_utils_python/setup.py +++ b/_delphi_utils_python/setup.py @@ -6,6 +6,8 @@ "pandas", "pytest", "pytest-cov", + "boto3", + "gitpython", ] setup( From 8a49e1779bad2aef50df5919c4346a67391efbe7 Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Tue, 4 Aug 2020 16:52:52 -0400 Subject: [PATCH 06/12] Added S3 archive diff unit tests --- _delphi_utils_python/delphi_utils/__init__.py | 2 +- _delphi_utils_python/delphi_utils/archive.py | 26 +- _delphi_utils_python/setup.py | 1 + _delphi_utils_python/tests/test_archive.py | 226 ++++++++++++++++++ 4 files changed, 237 insertions(+), 18 deletions(-) create mode 100644 _delphi_utils_python/tests/test_archive.py diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index afd8562db..d8ad7d8bd 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -5,7 +5,7 @@ from __future__ import absolute_import from .export import create_export_csv -from .archive import GitArchiveDiffer, S3ArchiveDiffer +from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer from .utils import read_params __version__ = "0.1.0" diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index edd29ce2a..4adbfc2a1 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -74,16 +74,17 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: before_file = join(self.cache_dir, filename) after_file = join(self.export_dir, filename) + common_diffs[after_file] = None + # Check for simple file similarity before doing CSV diffs if filecmp.cmp(before_file, after_file, shallow=False): - common_diffs[after_file] = None continue deleted_df, changed_df, added_df = diff_export_csv(before_file, after_file) new_issues_df = pd.concat([changed_df, added_df], axis=0) if len(deleted_df) > 0: - raise NotImplementedError("Cannot handle deletions yet") + print(f"Warning, diff has deleted indices in {after_file} that will be ignored") # Write the diffs to diff_file, if applicable if len(new_issues_df) > 0: @@ -92,12 +93,9 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: new_issues_df.to_csv(diff_file, na_rep="NA") common_diffs[after_file] = diff_file - else: - common_diffs[after_file] = None - return deleted_files, common_diffs, new_files - def archive_exports(self, exported_files: Optional[Files]) -> Tuple[Files, Files]: + def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: raise NotImplementedError def filter_exports(self, common_diffs: FileDiffMap): @@ -144,13 +142,10 @@ def update_cache(self): self._cache_updated = True - def archive_exports(self, exported_files: Optional[Files] = None) -> Tuple[Files, Files]: + def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: archive_success = [] archive_fail = [] - if exported_files is None: - exported_files = glob(join(self.export_dir, "*.csv")) - # Enable versioning if turned off # if self.bucket_versioning.status != "Enabled": # self.bucket_versioning.enable() @@ -159,10 +154,10 @@ def archive_exports(self, exported_files: Optional[Files] = None) -> Tuple[Files cached_file = abspath(join(self.cache_dir, basename(exported_file))) archive_key = join(self.indicator_prefix, basename(exported_file)) - # Update local cache - shutil.copyfile(exported_file, cached_file) - try: + # Update local cache + shutil.copyfile(exported_file, cached_file) + self.bucket.Object(archive_key).upload_file(exported_file) # TODO: Wait until exists to confirm successful upload? archive_success.append(exported_file) @@ -226,14 +221,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: with self.archiving_branch(): return super().diff_exports() - def archive_exports(self, exported_files: Optional[Files] = None) -> Tuple[Files, Files]: + def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: archived_files = [] archive_success = [] archive_fail = [] - if exported_files is None: - exported_files = glob(join(self.export_dir, "*.csv")) - with self.archiving_branch(): # Abs paths of all modified files to check if we will override uncommitted changes working_tree_dir = self.repo.working_tree_dir diff --git a/_delphi_utils_python/setup.py b/_delphi_utils_python/setup.py index ea21c6be0..75f331fc6 100644 --- a/_delphi_utils_python/setup.py +++ b/_delphi_utils_python/setup.py @@ -7,6 +7,7 @@ "pytest", "pytest-cov", "boto3", + "moto", "gitpython", ] diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py new file mode 100644 index 000000000..67eb4083f --- /dev/null +++ b/_delphi_utils_python/tests/test_archive.py @@ -0,0 +1,226 @@ + +from io import StringIO, BytesIO +from os import listdir, mkdir +from os.path import join + +import pytest +import pandas as pd +from pandas.testing import assert_frame_equal +from boto3 import Session +from moto import mock_s3 + +from delphi_utils import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer + +CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float} + +class TestArchiveDiffer: + + def test_stubs(self): + arch_diff = ArchiveDiffer("cache", "export") + + with pytest.raises(NotImplementedError): + arch_diff.update_cache() + + with pytest.raises(NotImplementedError): + arch_diff.archive_exports(None) + + + def test_diff_and_filter_exports(self, tmp_path): + cache_dir = join(str(tmp_path), "cache") + export_dir = join(str(tmp_path), "export") + mkdir(cache_dir) + mkdir(export_dir) + + csvs_before = { + # Common + "csv0": pd.DataFrame({ + "geo_id": ["1", "2", "3"], + "val": [1.0, 2.0, 3.0], + "se": [0.1, 0.2, 0.3], + "sample_size": [10.0, 20.0, 30.0]}), + + "csv1": pd.DataFrame({ + "geo_id": ["1", "2", "3"], + "val": [1.0, 2.0, 3.0], + "se": [0.1, 0.2, 0.3], + "sample_size": [10.0, 20.0, 30.0]}), + + # Deleted + "csv2": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0]}), + } + + csvs_after = { + # Common + "csv0": pd.DataFrame({ + "geo_id": ["1", "2", "3"], + "val": [1.0, 2.0, 3.0], + "se": [0.1, 0.2, 0.3], + "sample_size": [10.0, 20.0, 30.0]}), + + "csv1": pd.DataFrame({ + "geo_id": ["1", "2", "4"], + "val": [1.0, 2.1, 4.0], + "se": [0.1, 0.21, 0.4], + "sample_size": [10.0, 21.0, 40.0]}), + + # Added + "csv3": pd.DataFrame({ + "geo_id": ["2"], + "val": [2.0], + "se": [0.2], + "sample_size": [20.0]}), + } + csv1_diff = pd.DataFrame({ + "geo_id": ["2", "4"], + "val": [2.1, 4.0], + "se": [0.21, 0.4], + "sample_size": [21.0, 40.0]}) + + arch_diff = ArchiveDiffer(cache_dir, export_dir) + + # Test diff_exports + # ================= + + # Should fail as cache was not updated yet + with pytest.raises(AssertionError): + arch_diff.diff_exports() + + # Simulate cache updated, and signal ran finish + for csv_name, df in csvs_before.items(): + df.to_csv(join(cache_dir, f"{csv_name}.csv"), index=False) + for csv_name, df in csvs_after.items(): + df.to_csv(join(export_dir, f"{csv_name}.csv"), index=False) + arch_diff._cache_updated = True + + deleted_files, common_diffs, new_files = arch_diff.diff_exports() + + # Check return values + assert set(deleted_files) == {join(cache_dir, "csv2.csv")} + assert set(common_diffs.keys()) == {join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]} + assert set(new_files) == {join(export_dir, "csv3.csv")} + assert common_diffs[join(export_dir, "csv0.csv")] is None + assert common_diffs[join(export_dir, "csv1.csv")] == join(export_dir, "csv1.csv.diff") + + # Check filesystem for actual files + assert set(listdir(export_dir)) == {"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"} + assert_frame_equal( + pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), + csv1_diff) + + # Test filter_exports + # =================== + + # Should fail as archive_exports not called yet + with pytest.raises(AssertionError): + arch_diff.filter_exports(common_diffs) + + # Simulate archive + arch_diff._exports_archived = True + + arch_diff.filter_exports(common_diffs) + + # Check exports directory just has incremental changes + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert_frame_equal( + pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), + csv1_diff) + +AWS_CREDENTIALS = { + "aws_access_key_id": "FAKE_TEST_ACCESS_KEY_ID", + "aws_secret_access_key": "FAKE_TEST_SECRET_ACCESS_KEY", +} + +@pytest.fixture(scope="function") +def s3_client(): + with mock_s3(): + yield Session(**AWS_CREDENTIALS).client("s3") + +class TestS3ArchiveDiffer: + bucket_name = "test-bucket" + indicator_prefix = "test" + + @mock_s3 + def test_update_cache(self, tmp_path, s3_client): + cache_dir = join(str(tmp_path), "cache") + export_dir = join(str(tmp_path), "export") + mkdir(cache_dir) + mkdir(export_dir) + + csv1 = pd.DataFrame({ + "geo_id": ["1", "2", "3"], + "val": [1.0, 2.0, 3.0], + "se": [0.1, 0.2, 0.3], + "sample_size": [10.0, 20.0, 30.0]}) + csv1_buf = StringIO() + csv1.to_csv(csv1_buf, index=False) + + csv2 = pd.DataFrame({ + "geo_id": ["1", "2", "4"], + "val": [1.0, 2.1, 4.0], + "se": [0.1, 0.21, 0.4], + "sample_size": [10.0, 21.0, 40.0]}) + csv2_buf = StringIO() + csv2.to_csv(csv2_buf, index=False) + + # Set up bucket with both objects + s3_client.create_bucket(Bucket=self.bucket_name) + s3_client.put_object( + Bucket=self.bucket_name, + Key=f"{self.indicator_prefix}/csv1.csv", + Body=BytesIO(csv1_buf.getvalue().encode())) + s3_client.put_object( + Bucket=self.bucket_name, + Key=f"{self.indicator_prefix}/csv2.csv", + Body=BytesIO(csv2_buf.getvalue().encode())) + + # Save only csv1 into cache folder + csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) + assert set(listdir(cache_dir)) == {"csv1.csv"} + + arch_diff = S3ArchiveDiffer( + cache_dir, export_dir, + self.bucket_name, self.indicator_prefix, + AWS_CREDENTIALS) + + # Should download csv2 into cache folder + arch_diff.update_cache() + assert set(listdir(cache_dir)) == {"csv1.csv", "csv2.csv"} + + @mock_s3 + def test_archive_exports(self, tmp_path, s3_client): + cache_dir = join(str(tmp_path), "cache") + export_dir = join(str(tmp_path), "export") + mkdir(cache_dir) + mkdir(export_dir) + + csv1 = pd.DataFrame({ + "geo_id": ["1", "2", "3"], + "val": [1.0, 2.0, 3.0], + "se": [0.1, 0.2, 0.3], + "sample_size": [10.0, 20.0, 30.0]}) + csv1.to_csv(join(export_dir, "csv1.csv"), index=False) + + s3_client.create_bucket(Bucket=self.bucket_name) + + arch_diff = S3ArchiveDiffer( + cache_dir, export_dir, + self.bucket_name, self.indicator_prefix, + AWS_CREDENTIALS) + + successes, fails = arch_diff.archive_exports([ + join(export_dir, "csv1.csv"), + join(export_dir, "not_a_csv.csv"), + ]) + + assert set(successes) == {join(export_dir, "csv1.csv")} + assert set(fails) == {join(export_dir, "not_a_csv.csv")} + + body = s3_client.get_object( + Bucket=self.bucket_name, + Key=f"{self.indicator_prefix}/csv1.csv")["Body"] + + assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), csv1) From 0087f4d0edd2d34a6b4e3f1eb4a7bb92d8ed779b Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Wed, 5 Aug 2020 16:50:22 -0400 Subject: [PATCH 07/12] Updated unit tests and docs --- _delphi_utils_python/delphi_utils/archive.py | 234 +++++++++++++++++-- _delphi_utils_python/tests/test_archive.py | 154 +++++++++++- 2 files changed, 364 insertions(+), 24 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 4adbfc2a1..b27f0f995 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -7,7 +7,9 @@ from typing import Tuple, List, Dict, Optional from boto3 import Session +from boto3.exceptions import S3UploadFailedError from git import Repo +from git.refs.head import Head import pandas as pd Files = List[str] @@ -17,6 +19,24 @@ def diff_export_csv( before_csv: str, after_csv: str ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + """ + Find differences in exported covidcast CSVs, using geo_id as the index. + Treats NA == NA as True. + + Parameters + ---------- + before_csv: str + The CSV file to diff from + after_csv: str + The CSV file to diff to + + Returns + ------- + (deleted_df, changed_df, added_df) + deleted_df is the pd.DataFrame of deleted rows from before_csv. + changed_df is the pd.DataFrame of common rows from after_csv with changed values. + added_df is the pd.DataFrame of added rows from after_csv. + """ export_csv_dtypes = {"geo_id": str, "val": float, "se": float, "sample_size": float} @@ -33,8 +53,7 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # Comparison treating NA == NA as True - # TODO: Should we change exact equality to some approximate one? + # Exact comparisons, treating NA == NA as True same_mask = before_df_cmn == after_df_cmn same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) @@ -44,8 +63,23 @@ def diff_export_csv( after_df.loc[added_idx, :]) class ArchiveDiffer: + """ + Base class for performing diffing and archiving of exported covidcast CSVs + """ def __init__(self, cache_dir: str, export_dir: str): + """ + Initialize an ArchiveDiffer + + Parameters + ---------- + cache_dir: str + The directory for storing most recent archived/uploaded CSVs to do start diffing from. + Usually 'cache'. + export_dir: str + The directory with most recent exported CSVs to diff to. + Usually 'receiving'. + """ self.cache_dir = cache_dir self.export_dir = export_dir @@ -53,12 +87,29 @@ def __init__(self, cache_dir: str, export_dir: str): self._exports_archived = False def update_cache(self): - # Depends on the archiving backend + """ + For making sure cache_dir is updated correctly from a backend. + To be implemented by specific archiving backends. + Should set self._cache_updated = True after verifying cache is updated. + """ raise NotImplementedError def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: - # Main logic of finding diffs across and within CSV files - # Should be called after update_cache + """ + Finds diffs across and within CSV files, from cache_dir to export_dir. + Should be called after update_cache() succeeds. Only works on *.csv files, + ignores every other file. + + Returns + ------- + (deleted_files, common_diffs, new_files): Tuple[Files, FileDiffMap, Files] + deleted_files: List of files that are present in cache_dir but missing in export_dir. + common_diffs: Dict mapping common files in export_dir with cache_dir to: + - None, if the common file is identical + - None, if the export_dir version only has DELETED rows + - a filename with .csv.diff suffix, containing ADDED and CHANGED rows ONLY + added_files: List of files that are missing in cache_dir but present in export_dir. + """ assert self._cache_updated # Glob to only pick out CSV files, ignore hidden files @@ -69,7 +120,7 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: common_filenames = sorted(exported_files & previous_files) new_files = sorted(join(self.export_dir, f) for f in exported_files - previous_files) - common_diffs = {} + common_diffs: Dict[str, Optional[str]] = {} for filename in common_filenames: before_file = join(self.cache_dir, filename) after_file = join(self.export_dir, filename) @@ -96,9 +147,40 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: return deleted_files, common_diffs, new_files def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: + """ + Handles actual archiving of files, depending on specific backend. + To be implemented by specific archiving backends. + + Parameters + ---------- + exported_files: Files + List of files to be archived. Usually new and changed files. + + Returns + ------- + (successes, fails): Tuple[Files, Files] + successes: List of successfully archived files + fails: List of unsuccessfully archived files + """ raise NotImplementedError def filter_exports(self, common_diffs: FileDiffMap): + """ + Filters down the export_dir to only contain: + 1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only. + Should be called after archive_exports() so we archive the raw exports before + potentially modifying them. + + Parameters + ---------- + common_diffs: FileDiffmap + Same semantics as in diff_exports(). For each exported_file, diff key-value pair: + 1) If the diff is None, remove exported_file from export_dir + 2) If there is a diff, replace exported_file with the diff + Since this is done for all key-value pairs, one can filter down common_diffs to only + a subset of exported_files to operate on before calling filter_exports. + For example, removing keys that correspond to failed-to-archive files. + """ # Should be called after archive_exports assert self._exports_archived @@ -112,20 +194,47 @@ def filter_exports(self, common_diffs: FileDiffMap): replace(diff_file, exported_file) class S3ArchiveDiffer(ArchiveDiffer): + """ + AWS S3 backend for archving + Archives CSV files into a S3 bucket, with keys "{indicator_prefix}/{csv_file_name}". + Ideally, versioning should be enabled in this bucket to track versions of each CSV file. + """ def __init__( self, cache_dir: str, export_dir: str, bucket_name: str, indicator_prefix: str, - s3_credentials: Dict[str, str], + aws_credentials: Dict[str, str], ): + """ + Initialize a S3ArchiveDiffer. + See this link for possible aws_credentials kwargs: + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session + + Parameters + ---------- + cache_dir: str + The directory for storing most recent archived/uploaded CSVs to do start diffing from. + Usually 'cache'. + export_dir: str + The directory with most recent exported CSVs to diff to. + Usually 'receiving'. + bucket_name: str + The S3 bucket to upload files to. + indicator_prefix: str + The prefix for S3 keys related to this indicator. + aws_credentials: Dict[str, str] + kwargs to create a boto3.Session, containing AWS credentials/profile to use. + """ super().__init__(cache_dir, export_dir) - self.s3 = Session(**s3_credentials).resource("s3") + self.s3 = Session(**aws_credentials).resource("s3") self.bucket = self.s3.Bucket(bucket_name) - self.bucket_versioning = self.s3.BucketVersioning(bucket_name) self.indicator_prefix = indicator_prefix def update_cache(self): + """ + For making sure cache_dir is updated with all latest files from the S3 bucket. + """ # List all indicator-related objects from S3 archive_objects = self.bucket.objects.filter(Prefix=self.indicator_prefix).all() archive_objects = [obj for obj in archive_objects if obj.key.endswith(".csv")] @@ -143,13 +252,23 @@ def update_cache(self): self._cache_updated = True def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: + """ + Handles actual archiving of files to the S3 bucket. + + Parameters + ---------- + exported_files: Files + List of files to be archived. Usually new and changed files. + + Returns + ------- + (successes, fails): Tuple[Files, Files] + successes: List of successfully archived files + fails: List of unsuccessfully archived files + """ archive_success = [] archive_fail = [] - # Enable versioning if turned off - # if self.bucket_versioning.status != "Enabled": - # self.bucket_versioning.enable() - for exported_file in exported_files: cached_file = abspath(join(self.cache_dir, basename(exported_file))) archive_key = join(self.indicator_prefix, basename(exported_file)) @@ -159,9 +278,9 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: shutil.copyfile(exported_file, cached_file) self.bucket.Object(archive_key).upload_file(exported_file) - # TODO: Wait until exists to confirm successful upload? + archive_success.append(exported_file) - except: + except FileNotFoundError: archive_fail.append(exported_file) self._exports_archived = True @@ -169,6 +288,11 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: return archive_success, archive_fail class GitArchiveDiffer(ArchiveDiffer): + """ + Local git repo backend for archiving + Archives CSV files into a local git repo as commits. + Assumes that a git repository is already set up. + """ def __init__( self, cache_dir: str, export_dir: str, @@ -177,21 +301,54 @@ def __init__( commit_partial_success: bool = False, commit_message: str = "Automated archive", ): + """ + Initialize a GitArchiveDiffer. + + Parameters + ---------- + cache_dir: str + The directory for storing most recent archived/uploaded CSVs to do start diffing from. + Either cache_dir or some parent dir should be a git repository. Usually 'cache'. + export_dir: str + The directory with most recent exported CSVs to diff to. + Usually 'receiving'. + branch_name: Optional[str] + Branch to use for archiving. Uses current branch if None. + override_dirty: bool + Whether to allow overwriting of untracked & uncommitted changes in cache_dir. + commit_partial_success: bool + Whether to still commit even if some files were not archived and staged due + to override_dirty=False + commit_message: str + The automatic commit message to use for the commit. + """ super().__init__(cache_dir, export_dir) assert override_dirty or not commit_partial_success, \ "Only can commit_partial_success=True when override_dirty=True" - # TODO: Handle if no repository is found + # Assumes a repository is set up already, will raise exception if not found self.repo = Repo(cache_dir, search_parent_directories=True) - self.orig_branch = self.repo.active_branch self.branch = self.get_branch(branch_name) self.override_dirty = override_dirty self.commit_partial_success = commit_partial_success self.commit_message = commit_message - def get_branch(self, branch_name: Optional[str] = None): + def get_branch(self, branch_name: Optional[str] = None) -> Head: + """ + Retrieves a Head object representing a branch of specified name. + Creates the branch from the current active branch if does not exist yet. + + Parameters + ---------- + branch_name: Optional[str] + If None, just returns current branch. Otherwise, retrieves/creates branch. + + Returns + ------- + branch: Head + """ if branch_name is None: return self.repo.active_branch @@ -202,14 +359,23 @@ def get_branch(self, branch_name: Optional[str] = None): @contextmanager def archiving_branch(self): + """ + Useful for checking out self.branch within a context, then switching back + to original branch when finished. + """ + orig_branch = self.repo.active_branch self.branch.checkout() try: yield self.branch finally: - self.orig_branch.checkout() + orig_branch.checkout() def update_cache(self): + """ + Since we are using a local git repo, assumes there is nothing to update from. + Checks if cache_dir is clean: has everything nice committed if override_dirty=False + """ # Make sure cache directory is clean: has everything nicely committed if not self.override_dirty: cache_clean = not self.repo.is_dirty(untracked_files=True, path=abspath(self.cache_dir)) @@ -218,10 +384,27 @@ def update_cache(self): self._cache_updated = True def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: + """ + Same as base class diff_exports, but in context of specified branch + """ with self.archiving_branch(): return super().diff_exports() def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: + """ + Handles actual archiving of files to the local git repo. + + Parameters + ---------- + exported_files: Files + List of files to be archived. Usually new and changed files. + + Returns + ------- + (successes, fails): Tuple[Files, Files] + successes: List of successfully archived files + fails: List of unsuccessfully archived files + """ archived_files = [] archive_success = [] archive_fail = [] @@ -237,11 +420,16 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: # Archive and explicitly stage new export, depending if override if self.override_dirty or archive_file not in dirty_files: - # Archive - shutil.copyfile(exported_file, archive_file) + try: + # Archive + shutil.copyfile(exported_file, archive_file) + + archived_files.append(archive_file) + archive_success.append(exported_file) - archived_files.append(archive_file) - archive_success.append(exported_file) + except (FileNotFoundError, S3UploadFailedError) as ex: + print(ex) + archive_fail.append(exported_file) # Otherwise ignore the archiving for this file else: diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 67eb4083f..68f594d38 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -3,11 +3,12 @@ from os import listdir, mkdir from os.path import join -import pytest import pandas as pd from pandas.testing import assert_frame_equal from boto3 import Session from moto import mock_s3 +from git import Repo, exc +import pytest from delphi_utils import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer @@ -224,3 +225,154 @@ def test_archive_exports(self, tmp_path, s3_client): Key=f"{self.indicator_prefix}/csv1.csv")["Body"] assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), csv1) + +class TestGitArchiveDiffer: + + def test_init_args(self, tmp_path): + cache_dir = str(tmp_path / "cache") + export_dir = str(tmp_path / "export") + mkdir(cache_dir) + mkdir(export_dir) + + with pytest.raises(AssertionError): + GitArchiveDiffer(cache_dir, export_dir, override_dirty=False, commit_partial_success=True) + + with pytest.raises(exc.InvalidGitRepositoryError): + GitArchiveDiffer(cache_dir, export_dir) + + repo = Repo.init(cache_dir) + assert not repo.is_dirty(untracked_files=True) + + arch_diff = GitArchiveDiffer(cache_dir, export_dir) + assert arch_diff.branch == arch_diff.repo.active_branch + + def test_update_cache(self, tmp_path): + cache_dir = str(tmp_path / "cache") + export_dir = str(tmp_path / "export") + mkdir(cache_dir) + mkdir(export_dir) + + Repo.init(cache_dir) + + # Make repo dirty + with open(join(cache_dir, "test.txt"), "w") as f: + f.write("123") + + arch_diff1 = GitArchiveDiffer(cache_dir, export_dir, override_dirty=False) + with pytest.raises(AssertionError): + arch_diff1.update_cache() + + arch_diff2 = GitArchiveDiffer(cache_dir, export_dir, override_dirty=True) + arch_diff2.update_cache() + assert arch_diff2._cache_updated + + def test_diff_exports(self, tmp_path): + cache_dir = str(tmp_path / "cache") + export_dir = str(tmp_path / "export") + mkdir(cache_dir) + mkdir(export_dir) + + branch_name = "test-branch" + + repo = Repo.init(cache_dir) + repo.index.commit(message="Initial commit") + orig_branch = repo.active_branch + assert branch_name not in repo.heads + + orig_files = {".git"} + + arch_diff = GitArchiveDiffer( + cache_dir, export_dir, + branch_name=branch_name) + arch_diff.update_cache() + + # Should have no differences, but branch should be created + deleted_files, common_diffs, new_files = arch_diff.diff_exports() + + assert branch_name in repo.heads + assert set(listdir(cache_dir)) == orig_files + assert set(deleted_files) == set() + assert set(common_diffs.keys()) == set() + assert set(new_files) == set() + + csv1 = pd.DataFrame({ + "geo_id": ["1", "2", "3"], + "val": [1.0, 2.0, 3.0], + "se": [0.1, 0.2, 0.3], + "sample_size": [10.0, 20.0, 30.0]}) + + # Write exact same CSV into cache and export, so no diffs expected + csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) + csv1.to_csv(join(export_dir, "csv1.csv"), index=False) + + # Store the csv in custom branch + arch_diff.get_branch(branch_name).checkout() + repo.index.add([join(cache_dir, "csv1.csv")]) + repo.index.commit(message="Test commit") + orig_branch.checkout() + + assert repo.active_branch == orig_branch + + deleted_files, common_diffs, new_files = arch_diff.diff_exports() + + # We will be back in original branch, so cache should not have csv1.csv + assert set(listdir(cache_dir)) == orig_files + assert set(deleted_files) == set() + assert set(common_diffs.keys()) == {join(export_dir, "csv1.csv")} + assert common_diffs[join(export_dir, "csv1.csv")] is None + assert set(new_files) == set() + + def test_archive_exports(self, tmp_path): + cache_dir = str(tmp_path / "cache") + export_dir = str(tmp_path / "export") + mkdir(cache_dir) + mkdir(export_dir) + + repo = Repo.init(cache_dir) + repo.index.commit(message="Initial commit") + orig_commit = repo.active_branch.commit + + csv1 = pd.DataFrame({ + "geo_id": ["1", "2", "3"], + "val": [1.0, 2.0, 3.0], + "se": [0.1, 0.2, 0.3], + "sample_size": [10.0, 20.0, 30.0]}) + + # csv1.csv is now a dirty edit in the repo, and to be exported too + csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) + csv1.to_csv(join(export_dir, "csv1.csv"), index=False) + + # Try to archive csv1.csv and non-existant csv2.csv + exported_files = [join(export_dir, "csv1.csv"), join(export_dir, "csv2.csv")] + + # All should fail, cannot override dirty and file not found + arch_diff1 = GitArchiveDiffer( + cache_dir, export_dir, + override_dirty=False, + commit_partial_success=False) + + succs, fails = arch_diff1.archive_exports(exported_files) + assert set(succs) == set() + assert set(fails) == set(exported_files) + + # Only csv1.csv should succeed, but no commit should be made + arch_diff2 = GitArchiveDiffer( + cache_dir, export_dir, + override_dirty=True, + commit_partial_success=False) + + succs, fails = arch_diff2.archive_exports(exported_files) + assert set(succs) == {join(export_dir, "csv1.csv")} + assert set(fails) == {join(export_dir, "csv2.csv")} + assert repo.active_branch.commit == orig_commit + + # Only csv1.csv should succeed, and a commit should be made + arch_diff3 = GitArchiveDiffer( + cache_dir, export_dir, + override_dirty=True, + commit_partial_success=True) + + succs, fails = arch_diff3.archive_exports(exported_files) + assert set(succs) == {join(export_dir, "csv1.csv")} + assert set(fails) == {join(export_dir, "csv2.csv")} + assert repo.active_branch.set_commit("HEAD~1").commit == orig_commit From 7f93a5bc293309f58997fec3734db7868f718771 Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Wed, 5 Aug 2020 17:00:11 -0400 Subject: [PATCH 08/12] Updated JHU example --- jhu/delphi_jhu/run.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/jhu/delphi_jhu/run.py b/jhu/delphi_jhu/run.py index 65da0054e..ec3748a98 100644 --- a/jhu/delphi_jhu/run.py +++ b/jhu/delphi_jhu/run.py @@ -14,7 +14,7 @@ from delphi_utils import ( read_params, create_export_csv, - GitArchiveDiffer, + S3ArchiveDiffer, ) from .geo import geo_map @@ -67,9 +67,11 @@ def run_module(): base_url = params["base_url"] static_file_dir = params["static_file_dir"] cache_dir = params["cache_dir"] - arch_diff = GitArchiveDiffer( + + arch_diff = S3ArchiveDiffer( cache_dir, export_dir, - branch_name="jhu-receiving-archive") + params["bucket_name"], "jhu", + params["aws_credentials"]) arch_diff.update_cache() map_df = pd.read_csv( @@ -105,14 +107,18 @@ def run_module(): sensor=sensor_name, ) - # Exports - _, common_diffs, _ = arch_diff.diff_exports() - successes, fails = arch_diff.archive_exports() - succ_common_diffs = { - exported_file: diff_file - for exported_file, diff_file in common_diffs.items() - if exported_file in successes} + # Diff exports, and make incremental versions + _, common_diffs, new_files = arch_diff.diff_exports() + + # Archive changed and new files only + to_archive = [f for f, diff in common_diffs.items() if diff is not None] + to_archive += new_files + _, fails = arch_diff.archive_exports(to_archive) + + # Filter existing exports to exclude those that failed to archive + succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails} arch_diff.filter_exports(succ_common_diffs) + # Report failures: someone should probably look at them for exported_file in fails: print(f"Failed to archive '{exported_file}'") From eecb91ef9307d54f341bb458d982b8483b5d810e Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Wed, 5 Aug 2020 17:04:54 -0400 Subject: [PATCH 09/12] Update JHU params.json.template --- jhu/params.json.template | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/jhu/params.json.template b/jhu/params.json.template index af286e644..b32c6ddc0 100644 --- a/jhu/params.json.template +++ b/jhu/params.json.template @@ -3,5 +3,9 @@ "static_file_dir": "./static", "export_dir": "./receiving", "cache_dir": "./cache", - "base_url": "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_{metric}_US.csv" + "base_url": "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_{metric}_US.csv", + "aws_credentials": { + "aws_access_key_id": "", + "aws_secret_access_key": "", + } } From 2e4facdff15c95b04418d2c9fa35fe0f4c8934b6 Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Thu, 6 Aug 2020 14:32:25 -0400 Subject: [PATCH 10/12] Rearranged imports --- _delphi_utils_python/delphi_utils/__init__.py | 2 +- _delphi_utils_python/setup.py | 6 +++--- _delphi_utils_python/tests/test_archive.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index d8ad7d8bd..6fc515381 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -4,8 +4,8 @@ from __future__ import absolute_import -from .export import create_export_csv from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer +from .export import create_export_csv from .utils import read_params __version__ = "0.1.0" diff --git a/_delphi_utils_python/setup.py b/_delphi_utils_python/setup.py index 75f331fc6..5bb9554ba 100644 --- a/_delphi_utils_python/setup.py +++ b/_delphi_utils_python/setup.py @@ -2,13 +2,13 @@ from setuptools import find_packages required = [ + "boto3", + "gitpython", + "moto", "numpy", "pandas", "pytest", "pytest-cov", - "boto3", - "moto", - "gitpython", ] setup( diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 68f594d38..d23d18faf 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -3,11 +3,11 @@ from os import listdir, mkdir from os.path import join -import pandas as pd -from pandas.testing import assert_frame_equal from boto3 import Session -from moto import mock_s3 from git import Repo, exc +from moto import mock_s3 +import pandas as pd +from pandas.testing import assert_frame_equal import pytest from delphi_utils import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer From e472089bfef0879703398a580c16667d91562613 Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Thu, 6 Aug 2020 17:45:29 -0400 Subject: [PATCH 11/12] Added more docs --- _delphi_utils_python/delphi_utils/archive.py | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index b27f0f995..3227b13fa 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -1,3 +1,30 @@ +""" +Utilities for diffing and archiving covidcast export CSVs. +Aims to simplify the creation of issues for new and backfilled value for indicators. +Also handles archiving of export CSVs to some backend (git, S3 etc.) before replacing them. + +Example workflow regardless of specific ArchiveDiffer used. Should only differ in intialization. +1) Initialize and update cache folder if neccessary +>>> arch_diff = S3ArchiveDiffer(cache_dir, export_dir, ...) +>>> arch_diff.update_cache() +>>> ... # Run indicator and generate full exports in `export_dir` + +2) Create new diff files from cache files vs export files +>>> deleted_files, common_diffs, new_files = arch_diff.diff_exports() + +3) Archive common files with diffs and new files +>>> to_archive = [f for f, diff in common_diffs.items() if diff is not None] +>>> to_archive += new_files +>>> succs, fails = arch_diff.archive_exports(to_archive) + +4) Filter exports: Replace files with their diffs, or remove if no diffs +>>> succ_common_diffs = {f: diff for f, diff in common_diffs.items() if f not in fails} +>>> arch_diff.filter_exports(succ_common_diffs) + +Author: Eu Jing Chua +Created: 2020-08-06 +""" + from contextlib import contextmanager import filecmp from glob import glob From 8cac7675d914d48600e3802142045890b59144e4 Mon Sep 17 00:00:00 2001 From: Chua Eu Jing Date: Mon, 10 Aug 2020 11:09:52 -0400 Subject: [PATCH 12/12] Added some NaNs to tests to verify behavior --- _delphi_utils_python/tests/test_archive.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index d23d18faf..bd3321898 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -6,6 +6,7 @@ from boto3 import Session from git import Repo, exc from moto import mock_s3 +import numpy as np import pandas as pd from pandas.testing import assert_frame_equal import pytest @@ -43,7 +44,7 @@ def test_diff_and_filter_exports(self, tmp_path): "csv1": pd.DataFrame({ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], - "se": [0.1, 0.2, 0.3], + "se": [np.nan, 0.2, 0.3], "sample_size": [10.0, 20.0, 30.0]}), # Deleted @@ -65,7 +66,7 @@ def test_diff_and_filter_exports(self, tmp_path): "csv1": pd.DataFrame({ "geo_id": ["1", "2", "4"], "val": [1.0, 2.1, 4.0], - "se": [0.1, 0.21, 0.4], + "se": [np.nan, 0.21, np.nan], "sample_size": [10.0, 21.0, 40.0]}), # Added @@ -78,7 +79,7 @@ def test_diff_and_filter_exports(self, tmp_path): csv1_diff = pd.DataFrame({ "geo_id": ["2", "4"], "val": [2.1, 4.0], - "se": [0.21, 0.4], + "se": [0.21, np.nan], "sample_size": [21.0, 40.0]}) arch_diff = ArchiveDiffer(cache_dir, export_dir)