Skip to content
148 changes: 131 additions & 17 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Created: 2020-08-06
"""

from argparse import ArgumentParser
from contextlib import contextmanager
import filecmp
from glob import glob
Expand All @@ -39,9 +40,12 @@
from git.refs.head import Head
import pandas as pd

from .utils import read_params

Files = List[str]
FileDiffMap = Dict[str, Optional[str]]


def diff_export_csv(
before_csv: str,
after_csv: str
Expand All @@ -65,7 +69,8 @@ def diff_export_csv(
added_df is the pd.DataFrame of added rows from after_csv.
"""

export_csv_dtypes = {"geo_id": str, "val": float, "se": float, "sample_size": float}
export_csv_dtypes = {"geo_id": str, "val": float,
"se": float, "sample_size": float}

before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
before_df.set_index("geo_id", inplace=True)
Expand All @@ -89,6 +94,42 @@ def diff_export_csv(
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
after_df.loc[added_idx, :])


def run_module(archive_type: str,
cache_dir: str,
export_dir: str,
**kwargs):
"""Builds and runs an ArchiveDiffer.

Parameters
----------
archive_type: str
Type of ArchiveDiffer to run. Must be one of ["git", "s3"] which correspond to `GitArchiveDiffer` and `S3ArchiveDiffer`, respectively.
cache_dir: str
The directory for storing most recent archived/uploaded CSVs to start diffing from.
export_dir: str
The directory with most recent exported CSVs to diff to.
**kwargs:
Keyword arguments corresponding to constructor arguments for the respective ArchiveDiffers.
"""
if archive_type == "git":
arch_diff = GitArchiveDiffer(cache_dir,
export_dir,
kwargs["branch_name"],
kwargs["override_dirty"],
kwargs["commit_partial_success"],
kwargs["commit_message"])
elif archive_type == "s3":
arch_diff = S3ArchiveDiffer(cache_dir,
export_dir,
kwargs["bucket_name"],
kwargs["indicator_prefix"],
kwargs["aws_credentials"])
else:
raise ValueError(f"No archive type named '{archive_type}'")
arch_diff.run()


class ArchiveDiffer:
"""
Base class for performing diffing and archiving of exported covidcast CSVs
Expand Down Expand Up @@ -140,12 +181,16 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
assert self._cache_updated

# Glob to only pick out CSV files, ignore hidden files
previous_files = set(basename(f) for f in glob(join(self.cache_dir, "*.csv")))
exported_files = set(basename(f) for f in glob(join(self.export_dir, "*.csv")))
previous_files = set(basename(f)
for f in glob(join(self.cache_dir, "*.csv")))
exported_files = set(basename(f)
for f in glob(join(self.export_dir, "*.csv")))

deleted_files = sorted(join(self.cache_dir, f) for f in previous_files - exported_files)
deleted_files = sorted(join(self.cache_dir, f)
for f in previous_files - exported_files)
common_filenames = sorted(exported_files & previous_files)
new_files = sorted(join(self.export_dir, f) for f in exported_files - previous_files)
new_files = sorted(join(self.export_dir, f)
for f in exported_files - previous_files)

common_diffs: Dict[str, Optional[str]] = {}
for filename in common_filenames:
Expand All @@ -158,11 +203,13 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
if filecmp.cmp(before_file, after_file, shallow=False):
continue

deleted_df, changed_df, added_df = diff_export_csv(before_file, after_file)
deleted_df, changed_df, added_df = diff_export_csv(
before_file, after_file)
new_issues_df = pd.concat([changed_df, added_df], axis=0)

if len(deleted_df) > 0:
print(f"Warning, diff has deleted indices in {after_file} that will be ignored")
print(
f"Warning, diff has deleted indices in {after_file} that will be ignored")

# Write the diffs to diff_file, if applicable
if len(new_issues_df) > 0:
Expand Down Expand Up @@ -220,6 +267,29 @@ def filter_exports(self, common_diffs: FileDiffMap):
else:
replace(diff_file, exported_file)

def run(self):
"""Runs the differ and archives the changed and new files."""
self.update_cache()

# Diff exports, and make incremental versions
_, common_diffs, new_files = self.diff_exports()

# Archive changed and new files only
to_archive = [f for f, diff in common_diffs.items()
if diff is not None]
to_archive += new_files
_, fails = self.archive_exports(to_archive)

# Filter existing exports to exclude those that failed to archive
succ_common_diffs = {f: diff for f,
diff in common_diffs.items() if f not in fails}
self.filter_exports(succ_common_diffs)

# Report failures: someone should probably look at them
for exported_file in fails:
print(f"Failed to archive '{exported_file}'")


class S3ArchiveDiffer(ArchiveDiffer):
"""
AWS S3 backend for archving
Expand Down Expand Up @@ -263,11 +333,14 @@ def update_cache(self):
For making sure cache_dir is updated with all latest files from the S3 bucket.
"""
# List all indicator-related objects from S3
archive_objects = self.bucket.objects.filter(Prefix=self.indicator_prefix).all()
archive_objects = [obj for obj in archive_objects if obj.key.endswith(".csv")]
archive_objects = self.bucket.objects.filter(
Prefix=self.indicator_prefix).all()
archive_objects = [
obj for obj in archive_objects if obj.key.endswith(".csv")]

# Check against what we have locally and download missing ones
cached_files = set(basename(f) for f in glob(join(self.cache_dir, "*.csv")))
cached_files = set(basename(f)
for f in glob(join(self.cache_dir, "*.csv")))
for obj in archive_objects:
archive_file = basename(obj.key)
cached_file = join(self.cache_dir, archive_file)
Expand Down Expand Up @@ -297,7 +370,8 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
archive_fail = []

for exported_file in exported_files:
cached_file = abspath(join(self.cache_dir, basename(exported_file)))
cached_file = abspath(
join(self.cache_dir, basename(exported_file)))
archive_key = join(self.indicator_prefix, basename(exported_file))

try:
Expand All @@ -314,6 +388,7 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:

return archive_success, archive_fail


class GitArchiveDiffer(ArchiveDiffer):
"""
Local git repo backend for archiving
Expand Down Expand Up @@ -352,7 +427,7 @@ def __init__(
super().__init__(cache_dir, export_dir)

assert override_dirty or not commit_partial_success, \
"Only can commit_partial_success=True when override_dirty=True"
"Only can commit_partial_success=True when override_dirty=True"

# Assumes a repository is set up already, will raise exception if not found
self.repo = Repo(cache_dir, search_parent_directories=True)
Expand Down Expand Up @@ -405,7 +480,8 @@ def update_cache(self):
"""
# Make sure cache directory is clean: has everything nicely committed
if not self.override_dirty:
cache_clean = not self.repo.is_dirty(untracked_files=True, path=abspath(self.cache_dir))
cache_clean = not self.repo.is_dirty(
untracked_files=True, path=abspath(self.cache_dir))
assert cache_clean, f"There are uncommitted changes in the cache dir '{self.cache_dir}'"

self._cache_updated = True
Expand Down Expand Up @@ -439,11 +515,14 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
with self.archiving_branch():
# Abs paths of all modified files to check if we will override uncommitted changes
working_tree_dir = self.repo.working_tree_dir
dirty_files = [join(working_tree_dir, f) for f in self.repo.untracked_files]
dirty_files += [join(working_tree_dir, d.a_path) for d in self.repo.index.diff(None)]
dirty_files = [join(working_tree_dir, f)
for f in self.repo.untracked_files]
dirty_files += [join(working_tree_dir, d.a_path)
for d in self.repo.index.diff(None)]

for exported_file in exported_files:
archive_file = abspath(join(self.cache_dir, basename(exported_file)))
archive_file = abspath(
join(self.cache_dir, basename(exported_file)))

# Archive and explicitly stage new export, depending if override
if self.override_dirty or archive_file not in dirty_files:
Expand All @@ -469,11 +548,46 @@ def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]:
if len(exported_files) > 0:

# Support partial success and at least one archive succeeded
partial_success = self.commit_partial_success and len(archive_success) > 0
partial_success = self.commit_partial_success and len(
archive_success) > 0

if len(archive_success) == len(exported_files) or partial_success:
self.repo.index.commit(message=self.commit_message)

self._exports_archived = True

return archive_success, archive_fail


if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--archive_type", required=True, type=str,
choices=["git", "s3"],
help="Type of archive differ to use.")
parser.add_argument("--indicator_prefix", type=str, default="",
help="The prefix for S3 keys related to this indicator."
" Required for `archive_type = 's3'")
parser.add_argument("--branch_name", type=str, default="",
help=" Branch to use for `archive_type` = 'git'.")
parser.add_argument("--override_dirty", action="store_true",
help="Whether to allow overwriting of untracked &"
" uncommitted changes for `archive_type` = 'git'")
parser.add_argument("--commit_partial_success", action="store_true",
help="Whether to still commit for `archive_type` = "
"'git' even if some files were not archived and "
"staged due to `override_dirty` = False.")
parser.add_argument("--commit_message", type=str, default="",
help="Commit message for `archive_type` = 'git'")
args = parser.parse_args()
params = read_params()
run_module(args.archive_type,
params.cache_dir,
params.export_dir,
aws_credentials=params.aws_credentials,
branch_name=args.branch_name,
bucket_name=params.bucket_name,
commit_message=args.commit_message,
commit_partial_success=args.commit_partial_success,
indicator_prefix=args.indicator_prefix,
override_dirty=args.override_dirty
)
Loading