From 2a43f9a4addc76eb2c758bf84fd18679460b870f Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:52:14 -0400 Subject: [PATCH 01/15] add helper fn in utils to save backup data to csv --- _delphi_utils_python/delphi_utils/export.py | 59 +++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 8ac5de48e..217a03260 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -132,3 +132,62 @@ def create_export_csv( export_df = export_df.sort_values(by="geo_id") export_df.to_csv(export_file, index=False, na_rep="NA") return dates + +def create_backup_csv( + df: pd.DataFrame, + backup_dir: str, + custom_run: bool, + issue: Optional[str] = None, + table_name: Optional[str] = None, + geo_res: Optional[str] = None, + sensor: Optional[str] = None, + metric: Optional[str] = None +): + """Save data for use as a backup + + This function is meant to save raw data fetched from data sources. + Therefore, it avoids manipulating the data as much as possible to + preserve the input. + + When only required arguments are passed, data will be saved to a file of + the format `/.csv`. Optional arguments + should be passed if the source data is fetched from different tables or + in batches by signal, geo, etc. + + Parameters + ---------- + df: pd.DataFrame + Columns: geo_id, timestamp, val, se, sample_size + backup_dir: str + Backup directory + custom_run: bool + Flag indicating if the current run is a patch, or other run where + backups aren't needed. If so, don't save any data to disk + issue: Optional[str] + The date the data was fetched, in YYYYMMDD format. Defaults to "today" + if not provided + geo_res: Optional[str] + Geographic resolution of the data + sensor: Optional[str] + Sensor that has been calculated (cumulative_counts vs new_counts) + metric: Optional[str] + Metric we are considering, if any. + + Returns + --------- + dates: pd.Series[datetime] + Series of dates for which CSV files were exported. + """ + if not custom_run: + df = df.copy() + + # Label the file with today's date (the date the data was fetched). + if not issue: + issue = datetime.today().strftime('%Y%m%d') + backup_filename = [issue, geo_res, table_name, metric, sensor] + + # Drop empty elements + backup_filename = "_".join(filter(None, backup_filename)) + ".csv" + + backup_file = join(backup_dir, backup_filename) + backup_df.to_csv(backup_file, index=False, na_rep="NA") From 3c22ff13b9e77d7f88223d925b4bf899ec6f9982 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:52:47 -0400 Subject: [PATCH 02/15] use helper to save nchs data to disk right after pulling --- nchs_mortality/delphi_nchs_mortality/pull.py | 9 ++++++++- nchs_mortality/delphi_nchs_mortality/run.py | 5 ++++- nchs_mortality/tests/test_pull.py | 6 +++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 18bbfd59a..048622e7d 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -22,7 +22,7 @@ def standardize_columns(df): return df.rename(columns=dict(rename_pairs)) -def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None): +def pull_nchs_mortality_data(socrata_token: str, backup_dir: str, custom_run: bool, test_file: Optional[str] = None): """Pull the latest NCHS Mortality data, and conforms it into a dataset. The output dataset has: @@ -40,6 +40,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None ---------- socrata_token: str My App Token for pulling the NCHS mortality data + backup_dir: str + Directory to which to save raw backup data + custom_run: bool + Flag indicating if the current run is a patch. If so, don't save any data to disk test_file: Optional[str] When not null, name of file from which to read test data @@ -60,6 +64,9 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None client = Socrata("data.cdc.gov", socrata_token) results = client.get("r8kw-7aab", limit=10**10) df = pd.DataFrame.from_records(results) + + create_backup_csv(df, backup_dir, custom_run = custom_run) + # drop "By Total" rows df = df[df["group"].transform(str.lower) == "by week"] diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index 50ce46cfb..6454a009b 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -59,6 +59,8 @@ def run_module(params: Dict[str, Any]): days=date.today().weekday() + 2) export_start_date = export_start_date.strftime('%Y-%m-%d') daily_export_dir = params["common"]["daily_export_dir"] + backup_dir = params["common"]["backup_dir"] + custom_run = params["common"].get("custom_run", False) socrata_token = params["indicator"]["socrata_token"] test_file = params["indicator"].get("test_file", None) @@ -70,7 +72,8 @@ def run_module(params: Dict[str, Any]): daily_arch_diff.update_cache() stats = [] - df_pull = pull_nchs_mortality_data(socrata_token, test_file) + df_pull = pull_nchs_mortality_data(socrata_token, backup_dir, + is_patch = custom_run, test_file) for metric in METRICS: for geo in ["state", "nation"]: if metric == 'percent_of_expected_deaths': diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index fa58b04a5..e99311ba1 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -34,7 +34,7 @@ def test_standardize_columns(self): pd.testing.assert_frame_equal(expected, df) def test_good_file(self): - df = pull_nchs_mortality_data(SOCRATA_TOKEN, "test_data.csv") + df = pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, "test_data.csv") # Test columns assert ( @@ -90,9 +90,9 @@ def test_good_file(self): def test_bad_file_with_inconsistent_time_col(self): with pytest.raises(ValueError): pull_nchs_mortality_data( - SOCRATA_TOKEN, "bad_data_with_inconsistent_time_col.csv" + SOCRATA_TOKEN, backup_dir = "", custom_run = True, "bad_data_with_inconsistent_time_col.csv" ) def test_bad_file_with_missing_cols(self): with pytest.raises(ValueError): - pull_nchs_mortality_data(SOCRATA_TOKEN, "bad_data_with_missing_cols.csv") + pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, "bad_data_with_missing_cols.csv") From 02845f1a1ea9b11f6cd34350ae6191db9f246a16 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:59:15 -0400 Subject: [PATCH 03/15] add backup dir param --- .../nchs_mortality-params-prod.json.j2 | 1 + nchs_mortality/params.json.template | 1 + nchs_mortality/raw_data_backups/.gitignore | 120 ++++++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 nchs_mortality/raw_data_backups/.gitignore diff --git a/ansible/templates/nchs_mortality-params-prod.json.j2 b/ansible/templates/nchs_mortality-params-prod.json.j2 index dbd39598b..4b0d0c4f7 100644 --- a/ansible/templates/nchs_mortality-params-prod.json.j2 +++ b/ansible/templates/nchs_mortality-params-prod.json.j2 @@ -1,6 +1,7 @@ { "common": { "daily_export_dir": "./daily_receiving", + "backup_dir": "./raw_data_backups", "log_filename": "/var/log/indicators/nchs_mortality.log", "weekly_export_dir": "/common/covidcast/receiving/nchs-mortality" }, diff --git a/nchs_mortality/params.json.template b/nchs_mortality/params.json.template index ed16c620c..630a2d7ad 100644 --- a/nchs_mortality/params.json.template +++ b/nchs_mortality/params.json.template @@ -2,6 +2,7 @@ "common": { "daily_export_dir": "./daily_receiving", "weekly_export_dir": "./receiving", + "backup_dir": "./raw_data_backups", "log_filename": "/var/log/indicators/nchs_mortality.log", "log_exceptions": false }, diff --git a/nchs_mortality/raw_data_backups/.gitignore b/nchs_mortality/raw_data_backups/.gitignore new file mode 100644 index 000000000..552154e09 --- /dev/null +++ b/nchs_mortality/raw_data_backups/.gitignore @@ -0,0 +1,120 @@ +# You should hard commit a prototype for this file, but we +# want to avoid accidental adding of API tokens and other +# private data parameters +params.json + +# Do not commit output files +receiving/*.csv + +# Remove macOS files +.DS_Store + +# virtual environment +dview/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +coverage.xml +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ From a55be530d095073ac742bb533ff4ed88295e19bd Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 10 Oct 2024 17:02:59 -0400 Subject: [PATCH 04/15] import backup utility --- nchs_mortality/delphi_nchs_mortality/pull.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 048622e7d..39c4a59b7 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -8,6 +8,7 @@ from sodapy import Socrata from delphi_utils.geomap import GeoMapper +from delphi_utils import create_backup_csv from .constants import METRICS, RENAME, NEWLINE From 9cbde48fae0bc890789a82c2026084c37e9f041e Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 10 Oct 2024 17:28:49 -0400 Subject: [PATCH 05/15] update arg name --- nchs_mortality/delphi_nchs_mortality/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index 6454a009b..ec30066b1 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -73,7 +73,7 @@ def run_module(params: Dict[str, Any]): stats = [] df_pull = pull_nchs_mortality_data(socrata_token, backup_dir, - is_patch = custom_run, test_file) + custom_run = custom_run, test_file = test_file) for metric in METRICS: for geo in ["state", "nation"]: if metric == 'percent_of_expected_deaths': From 4ac82403bbe355b77283a8d40c577cf2f36907d5 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Fri, 18 Oct 2024 19:02:30 -0400 Subject: [PATCH 06/15] add gzip + fix old json template log + remove table_name --- _delphi_utils_python/delphi_utils/__init__.py | 2 +- _delphi_utils_python/delphi_utils/export.py | 10 +++++----- nchs_mortality/params.json.template | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 7a418551d..99a4d4ab5 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -4,7 +4,7 @@ from __future__ import absolute_import from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer -from .export import create_export_csv +from .export import create_export_csv, create_backup_csv from .utils import read_params from .slack_notifier import SlackNotifier diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 217a03260..4a2fa76b5 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,6 +1,7 @@ """Export data in the format expected by the Delphi API.""" # -*- coding: utf-8 -*- from datetime import datetime +import gzip from os.path import join from typing import Optional import logging @@ -138,7 +139,6 @@ def create_backup_csv( backup_dir: str, custom_run: bool, issue: Optional[str] = None, - table_name: Optional[str] = None, geo_res: Optional[str] = None, sensor: Optional[str] = None, metric: Optional[str] = None @@ -184,10 +184,10 @@ def create_backup_csv( # Label the file with today's date (the date the data was fetched). if not issue: issue = datetime.today().strftime('%Y%m%d') - backup_filename = [issue, geo_res, table_name, metric, sensor] + backup_filename = [issue, geo_res, metric, sensor] - # Drop empty elements - backup_filename = "_".join(filter(None, backup_filename)) + ".csv" + backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz" backup_file = join(backup_dir, backup_filename) - backup_df.to_csv(backup_file, index=False, na_rep="NA") + with gzip.open(backup_file, 'wt', newline='') as f: + df.to_csv(f, index=False, na_rep="NA") diff --git a/nchs_mortality/params.json.template b/nchs_mortality/params.json.template index 630a2d7ad..2e829de24 100644 --- a/nchs_mortality/params.json.template +++ b/nchs_mortality/params.json.template @@ -3,7 +3,7 @@ "daily_export_dir": "./daily_receiving", "weekly_export_dir": "./receiving", "backup_dir": "./raw_data_backups", - "log_filename": "/var/log/indicators/nchs_mortality.log", + "log_filename": "./nchs_mortality.log", "log_exceptions": false }, "indicator": { From 6fcd2ba3cfa6683e22b92232f2d2832b77475417 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Sun, 20 Oct 2024 19:32:24 -0400 Subject: [PATCH 07/15] fix current tests to take backup dirs and custom_run flag into account --- _delphi_utils_python/delphi_utils/export.py | 4 ++-- nchs_mortality/tests/conftest.py | 3 ++- nchs_mortality/tests/raw_data_backups/.gitignore | 2 ++ nchs_mortality/tests/test_pull.py | 6 +++--- 4 files changed, 9 insertions(+), 6 deletions(-) create mode 100644 nchs_mortality/tests/raw_data_backups/.gitignore diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 4a2fa76b5..9168a574d 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -184,10 +184,10 @@ def create_backup_csv( # Label the file with today's date (the date the data was fetched). if not issue: issue = datetime.today().strftime('%Y%m%d') - backup_filename = [issue, geo_res, metric, sensor] - + backup_filename = [issue, geo_res, metric, sensor] backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz" backup_file = join(backup_dir, backup_filename) + with gzip.open(backup_file, 'wt', newline='') as f: df.to_csv(f, index=False, na_rep="NA") diff --git a/nchs_mortality/tests/conftest.py b/nchs_mortality/tests/conftest.py index 6ad0f9c59..f9b112b0e 100644 --- a/nchs_mortality/tests/conftest.py +++ b/nchs_mortality/tests/conftest.py @@ -15,7 +15,8 @@ PARAMS = { "common": { "daily_export_dir": "./daily_receiving", - "weekly_export_dir": "./receiving" + "weekly_export_dir": "./receiving", + "backup_dir": "./raw_data_backups" }, "indicator": { "export_start_date": "2020-04-11", diff --git a/nchs_mortality/tests/raw_data_backups/.gitignore b/nchs_mortality/tests/raw_data_backups/.gitignore new file mode 100644 index 000000000..2b7efbb36 --- /dev/null +++ b/nchs_mortality/tests/raw_data_backups/.gitignore @@ -0,0 +1,2 @@ +*.csv +*.gz \ No newline at end of file diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index e99311ba1..c442f2853 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -34,7 +34,7 @@ def test_standardize_columns(self): pd.testing.assert_frame_equal(expected, df) def test_good_file(self): - df = pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, "test_data.csv") + df = pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "test_data.csv") # Test columns assert ( @@ -90,9 +90,9 @@ def test_good_file(self): def test_bad_file_with_inconsistent_time_col(self): with pytest.raises(ValueError): pull_nchs_mortality_data( - SOCRATA_TOKEN, backup_dir = "", custom_run = True, "bad_data_with_inconsistent_time_col.csv" + SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_inconsistent_time_col.csv" ) def test_bad_file_with_missing_cols(self): with pytest.raises(ValueError): - pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, "bad_data_with_missing_cols.csv") + pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv") From 49383c5bc37daba336bf4772ef31f6a2842e6cd3 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Sun, 20 Oct 2024 20:04:21 -0400 Subject: [PATCH 08/15] add logging --- _delphi_utils_python/delphi_utils/export.py | 12 +++++++++++- nchs_mortality/delphi_nchs_mortality/pull.py | 11 +++++++++-- nchs_mortality/delphi_nchs_mortality/run.py | 2 +- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 9168a574d..7ec1dd982 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -141,7 +141,8 @@ def create_backup_csv( issue: Optional[str] = None, geo_res: Optional[str] = None, sensor: Optional[str] = None, - metric: Optional[str] = None + metric: Optional[str] = None, + logger: Optional[logging.Logger] = None ): """Save data for use as a backup @@ -172,6 +173,8 @@ def create_backup_csv( Sensor that has been calculated (cumulative_counts vs new_counts) metric: Optional[str] Metric we are considering, if any. + logger: Optional[logging.Logger] + Pass a logger object here to log information about name and size of the backup file. Returns --------- @@ -191,3 +194,10 @@ def create_backup_csv( with gzip.open(backup_file, 'wt', newline='') as f: df.to_csv(f, index=False, na_rep="NA") + + if logger: + logger.info( + "Backup file created", + backup_file=backup_file, + backup_size=path.getsize(backup_file) + ) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 39c4a59b7..254a044d4 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -2,6 +2,7 @@ """Functions for pulling NCHS mortality data API.""" from typing import Optional +import logging import numpy as np import pandas as pd @@ -23,7 +24,13 @@ def standardize_columns(df): return df.rename(columns=dict(rename_pairs)) -def pull_nchs_mortality_data(socrata_token: str, backup_dir: str, custom_run: bool, test_file: Optional[str] = None): +def pull_nchs_mortality_data( + socrata_token: str, + backup_dir: str, + custom_run: bool, + logger: Optional[logging.Logger] = None, + test_file: Optional[str] = None +): """Pull the latest NCHS Mortality data, and conforms it into a dataset. The output dataset has: @@ -66,7 +73,7 @@ def pull_nchs_mortality_data(socrata_token: str, backup_dir: str, custom_run: bo results = client.get("r8kw-7aab", limit=10**10) df = pd.DataFrame.from_records(results) - create_backup_csv(df, backup_dir, custom_run = custom_run) + create_backup_csv(df, backup_dir, custom_run = custom_run, logger = logger) # drop "By Total" rows df = df[df["group"].transform(str.lower) == "by week"] diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index ec30066b1..f191781aa 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -73,7 +73,7 @@ def run_module(params: Dict[str, Any]): stats = [] df_pull = pull_nchs_mortality_data(socrata_token, backup_dir, - custom_run = custom_run, test_file = test_file) + custom_run = custom_run, test_file = test_file, logger = logger) for metric in METRICS: for geo in ["state", "nation"]: if metric == 'percent_of_expected_deaths': From 6159a728b79cefa444f6ca65159c02391cff7c5e Mon Sep 17 00:00:00 2001 From: minhkhul Date: Sun, 20 Oct 2024 20:15:16 -0400 Subject: [PATCH 09/15] fix log getsize of backup file --- _delphi_utils_python/delphi_utils/export.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 7ec1dd982..99546660e 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- from datetime import datetime import gzip -from os.path import join +from os.path import join, getsize from typing import Optional import logging @@ -199,5 +199,5 @@ def create_backup_csv( logger.info( "Backup file created", backup_file=backup_file, - backup_size=path.getsize(backup_file) + backup_size=getsize(backup_file) ) From 6a193ebb4a281003f3e3b5da34c0fba89028b680 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Sun, 20 Oct 2024 22:45:44 -0400 Subject: [PATCH 10/15] lint --- _delphi_utils_python/delphi_utils/export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 99546660e..f526671ae 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -144,7 +144,7 @@ def create_backup_csv( metric: Optional[str] = None, logger: Optional[logging.Logger] = None ): - """Save data for use as a backup + """Save data for use as a backup. This function is meant to save raw data fetched from data sources. Therefore, it avoids manipulating the data as much as possible to From 3115595972070bc289c46fd868d25dda592afb04 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Sun, 20 Oct 2024 23:00:25 -0400 Subject: [PATCH 11/15] lint --- _delphi_utils_python/delphi_utils/__init__.py | 13 ++++++------- _delphi_utils_python/delphi_utils/export.py | 17 +++++++++-------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 99a4d4ab5..ca5693eaf 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -4,15 +4,14 @@ from __future__ import absolute_import from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer -from .export import create_export_csv, create_backup_csv -from .utils import read_params - -from .slack_notifier import SlackNotifier -from .logger import get_structured_logger +from .export import create_backup_csv, create_export_csv from .geomap import GeoMapper -from .smooth import Smoother -from .signal import add_prefix +from .logger import get_structured_logger from .nancodes import Nans +from .signal import add_prefix +from .slack_notifier import SlackNotifier +from .smooth import Smoother +from .utils import read_params from .weekday import Weekday __version__ = "0.3.25" diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index f526671ae..373262af4 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,17 +1,18 @@ """Export data in the format expected by the Delphi API.""" # -*- coding: utf-8 -*- -from datetime import datetime import gzip -from os.path import join, getsize -from typing import Optional import logging +from datetime import datetime +from os.path import getsize, join +from typing import Optional -from epiweeks import Week import numpy as np import pandas as pd +from epiweeks import Week from .nancodes import Nans + def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): """Find values with contradictory missingness codes, filter them, and log.""" columns = ["val", "se", "sample_size"] @@ -142,7 +143,7 @@ def create_backup_csv( geo_res: Optional[str] = None, sensor: Optional[str] = None, metric: Optional[str] = None, - logger: Optional[logging.Logger] = None + logger: Optional[logging.Logger] = None, ): """Save data for use as a backup. @@ -186,18 +187,18 @@ def create_backup_csv( # Label the file with today's date (the date the data was fetched). if not issue: - issue = datetime.today().strftime('%Y%m%d') + issue = datetime.today().strftime("%Y%m%d") backup_filename = [issue, geo_res, metric, sensor] backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz" backup_file = join(backup_dir, backup_filename) - with gzip.open(backup_file, 'wt', newline='') as f: + with gzip.open(backup_file, "wt", newline="") as f: df.to_csv(f, index=False, na_rep="NA") if logger: logger.info( "Backup file created", backup_file=backup_file, - backup_size=getsize(backup_file) + backup_size=getsize(backup_file), ) From b000a18f9bccfb86926a0ab4d9f54797704f5002 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Sun, 20 Oct 2024 23:08:42 -0400 Subject: [PATCH 12/15] lint --- _delphi_utils_python/delphi_utils/export.py | 1 + 1 file changed, 1 insertion(+) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 373262af4..d9a1b46d4 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -135,6 +135,7 @@ def create_export_csv( export_df.to_csv(export_file, index=False, na_rep="NA") return dates + def create_backup_csv( df: pd.DataFrame, backup_dir: str, From 10d8a77039b69ade05285e18fdf7e84efd8ef473 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Sun, 20 Oct 2024 23:15:54 -0400 Subject: [PATCH 13/15] lint --- nchs_mortality/delphi_nchs_mortality/pull.py | 12 ++++++------ nchs_mortality/delphi_nchs_mortality/run.py | 5 +++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 254a044d4..7abfd4b00 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -1,17 +1,17 @@ # -*- coding: utf-8 -*- """Functions for pulling NCHS mortality data API.""" -from typing import Optional import logging +from typing import Optional import numpy as np import pandas as pd +from delphi_utils import create_backup_csv +from delphi_utils.geomap import GeoMapper from sodapy import Socrata -from delphi_utils.geomap import GeoMapper -from delphi_utils import create_backup_csv +from .constants import METRICS, NEWLINE, RENAME -from .constants import METRICS, RENAME, NEWLINE def standardize_columns(df): """Rename columns to comply with a standard set. @@ -29,7 +29,7 @@ def pull_nchs_mortality_data( backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None, - test_file: Optional[str] = None + test_file: Optional[str] = None, ): """Pull the latest NCHS Mortality data, and conforms it into a dataset. @@ -73,7 +73,7 @@ def pull_nchs_mortality_data( results = client.get("r8kw-7aab", limit=10**10) df = pd.DataFrame.from_records(results) - create_backup_csv(df, backup_dir, custom_run = custom_run, logger = logger) + create_backup_csv(df, backup_dir, custom_run=custom_run, logger=logger) # drop "By Total" rows df = df[df["group"].transform(str.lower) == "by week"] diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index f191781aa..4e88e9d61 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -72,8 +72,9 @@ def run_module(params: Dict[str, Any]): daily_arch_diff.update_cache() stats = [] - df_pull = pull_nchs_mortality_data(socrata_token, backup_dir, - custom_run = custom_run, test_file = test_file, logger = logger) + df_pull = pull_nchs_mortality_data( + socrata_token, backup_dir, custom_run=custom_run, test_file=test_file, logger=logger + ) for metric in METRICS: for geo in ["state", "nation"]: if metric == 'percent_of_expected_deaths': From 0d300baa775c4daccaef83630748bf52d059b083 Mon Sep 17 00:00:00 2001 From: minhkhul Date: Mon, 21 Oct 2024 17:17:22 -0400 Subject: [PATCH 14/15] add backup test --- nchs_mortality/delphi_nchs_mortality/pull.py | 3 ++- nchs_mortality/tests/conftest.py | 1 + nchs_mortality/tests/test_pull.py | 12 ++++++++++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 7abfd4b00..ad54e457a 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -73,8 +73,9 @@ def pull_nchs_mortality_data( results = client.get("r8kw-7aab", limit=10**10) df = pd.DataFrame.from_records(results) - create_backup_csv(df, backup_dir, custom_run=custom_run, logger=logger) + create_backup_csv(df, backup_dir, custom_run=custom_run, logger=logger) + if not test_file: # drop "By Total" rows df = df[df["group"].transform(str.lower) == "by week"] diff --git a/nchs_mortality/tests/conftest.py b/nchs_mortality/tests/conftest.py index f9b112b0e..383d1c782 100644 --- a/nchs_mortality/tests/conftest.py +++ b/nchs_mortality/tests/conftest.py @@ -14,6 +14,7 @@ PARAMS = { "common": { + "custom_run": True, "daily_export_dir": "./daily_receiving", "weekly_export_dir": "./receiving", "backup_dir": "./raw_data_backups" diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index c442f2853..4f18210f6 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -1,3 +1,4 @@ +import os import pytest import pandas as pd @@ -96,3 +97,14 @@ def test_bad_file_with_inconsistent_time_col(self): def test_bad_file_with_missing_cols(self): with pytest.raises(ValueError): pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv") + + def test_backup_today_data(self): + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = "./raw_data_backups" + pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv") + backup_file = f"{backup_dir}/{today}.csv.gz" + backup_df = pd.read_csv(backup_file) + source_df = pd.read_csv("test_data/test_data.csv") + pd.testing.assert_frame_equal(source_df, backup_df) + if os.path.exists(backup_file): + os.remove(backup_file) From f4645695b006a3729d3faa9091b696aff41e22cd Mon Sep 17 00:00:00 2001 From: minhkhul <118945681+minhkhul@users.noreply.github.com> Date: Tue, 22 Oct 2024 12:00:57 -0400 Subject: [PATCH 15/15] remove deep copy --- _delphi_utils_python/delphi_utils/export.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index d9a1b46d4..82493032e 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -184,8 +184,6 @@ def create_backup_csv( Series of dates for which CSV files were exported. """ if not custom_run: - df = df.copy() - # Label the file with today's date (the date the data was fetched). if not issue: issue = datetime.today().strftime("%Y%m%d")