-
Notifications
You must be signed in to change notification settings - Fork 16
Diffing and archiving utilities #185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| # Report failures: someone should probably look at them | ||
| for exported_file in fails: | ||
| print(f"Failed to archive '{exported_file}'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe these will show in the Delphi Automation log, but we should think about how to alert a human about then.
| To be implemented by specific archiving backends. | ||
| Should set self._cache_updated = True after verifying cache is updated. | ||
| """ | ||
| raise NotImplementedError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some basic architecture questions about this codebase. In terms of the process here, it is:
- first we update the cache; just to make sure I understand this, does this mean downloading a past version of the data from an S3 bucket or something?
- then we use covidcast to export the most recent snapshot of the data into CSVs in the folder
./receiving - finally we diff the latest data and our cache snapshot; this allows us to upload only the changes onto the cache
Do I have this right?
If so, then is this update_cache implementation waiting for a decision on which service to use for the caching?
(If these questions seem basic, that's probably because they are. I am totally new to this codebase and want to make sure the model in my head is correct 🙂).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conveniently, we have a record of the csv files that represent the most recent issue for each day available in the API, in archive/successful -- we can use those to prepopulate the cache folder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Yeah that is right. Just in case the local cache folder gets emptied out for some reason, this lets us re-download whatever we have stored in S3 up to the previous run into the cache
- Also right. The receiving folder's contents will eventually get ingested as new data in the API that is issued today, whether new or back-filled.
- We do diff the latest data and the cache snapshot, however we retain only the changes in the
./receivingfolder for ingesting. Before this retaining of only changes, the cache snapshot in./cacheis updated with just-generated output in./receiving.
I left the update_cache() and archive_exports() as not-implemented as I was kind of using ArchiveDiffer as a base class that contained the common operations of diffing and replacing/filtering files that were independent of the archiving backend. So in terms of usage, only the more specific child classes S3ArchiveDiffer and GitArchiveDiffer should be used. I think we are mainly using S3, but I happened to already have the git code written so I thought not to let it go to waste haha.
|
|
||
| deleted_idx = before_df.index.difference(after_df.index) | ||
| common_idx = before_df.index.intersection(after_df.index) | ||
| added_idx = after_df.index.difference(before_df.index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So as I understand it, the diff between two csv files is being measured in terms of the difference in the index, where the index is geo_id. If I understand correctly, this means that we can detect when new geographic locations are added and old ones removed. What is the anticipated way of handling a changed value in an already-existing index? Say "ca" is already present in both cache and export, but the source updates the value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we want to retain all new regions and all regions with updated values.
We want to identify deleted regions but how we handle them is TBD.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So regarding these index operations, I use common_idx to do a index-aligned comparison in the next few lines between the before and after df. Particularly, same_mask later on should be able to tell us (in a boolean-mask way) which values are changed among the common indices, then I use this mask to just return a df of changed-values only (with the after values).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, the same_mask code below handles the value comparisons and returns the changed rows dataframe. Got it.
| print(f"Updating cache with {cached_file}") | ||
| obj.Object().download_file(cached_file) | ||
|
|
||
| self._cache_updated = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! So this is implemented! Makes sense. So I believe this is the spot that needs a change to handle the case when the bucket is empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that is true, if the bucket is empty nothing will happen now, and all output will be treated as "new".
From what I gathered during the meeting, we can determine which snapshot of the output (from a separate archive/successful folder in the server) we want to initially populate the bucket with if we don't want this empty cache behavior.
| "csv1": pd.DataFrame({ | ||
| "geo_id": ["1", "2", "4"], | ||
| "val": [1.0, 2.1, 4.0], | ||
| "se": [0.1, 0.21, 0.4], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be a good idea to add a couple nans to csv1 to make sure that part works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense, thank you! I have added some nans to the test case.
|
|
||
| # Replace exports where diff file was generated | ||
| else: | ||
| replace(diff_file, exported_file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there must be a piece of code that knows how to handle a diff .csv file when uploading the API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So each .csv.diff file that is created is actually a regular CSV file too that is in the same format as the original .csv, just that it only contains new and modified rows of the original. This should replace an original .csv with its .csv.diff, and at the end there should only be .csv files left to ingest.
dshemetov
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me! The tests run on my machine. Just needs the update to handle filling the cache when the bucket is empty.
Utilities to address #166