Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions _delphi_utils_python/delphi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
from __future__ import absolute_import

from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer
from .export import create_export_csv
from .utils import read_params

from .slack_notifier import SlackNotifier
from .logger import get_structured_logger
from .export import create_backup_csv, create_export_csv
from .geomap import GeoMapper
from .smooth import Smoother
from .signal import add_prefix
from .logger import get_structured_logger
from .nancodes import Nans
from .signal import add_prefix
from .slack_notifier import SlackNotifier
from .smooth import Smoother
from .utils import read_params
from .weekday import Weekday

__version__ = "0.3.25"
75 changes: 72 additions & 3 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""Export data in the format expected by the Delphi API."""
# -*- coding: utf-8 -*-
import gzip
import logging
from datetime import datetime
from os.path import join
from os.path import getsize, join
from typing import Optional
import logging

from epiweeks import Week
import numpy as np
import pandas as pd
from epiweeks import Week

from .nancodes import Nans


def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
Expand Down Expand Up @@ -132,3 +134,70 @@ def create_export_csv(
export_df = export_df.sort_values(by="geo_id")
export_df.to_csv(export_file, index=False, na_rep="NA")
return dates


def create_backup_csv(
df: pd.DataFrame,
backup_dir: str,
custom_run: bool,
issue: Optional[str] = None,
geo_res: Optional[str] = None,
sensor: Optional[str] = None,
metric: Optional[str] = None,
logger: Optional[logging.Logger] = None,
):
"""Save data for use as a backup.

This function is meant to save raw data fetched from data sources.
Therefore, it avoids manipulating the data as much as possible to
preserve the input.

When only required arguments are passed, data will be saved to a file of
the format `<export_dir>/<today's date as YYYYMMDD>.csv`. Optional arguments
should be passed if the source data is fetched from different tables or
in batches by signal, geo, etc.

Parameters
----------
df: pd.DataFrame
Columns: geo_id, timestamp, val, se, sample_size
backup_dir: str
Backup directory
custom_run: bool
Flag indicating if the current run is a patch, or other run where
backups aren't needed. If so, don't save any data to disk
issue: Optional[str]
The date the data was fetched, in YYYYMMDD format. Defaults to "today"
if not provided
geo_res: Optional[str]
Geographic resolution of the data
sensor: Optional[str]
Sensor that has been calculated (cumulative_counts vs new_counts)
metric: Optional[str]
Metric we are considering, if any.
logger: Optional[logging.Logger]
Pass a logger object here to log information about name and size of the backup file.

Returns
---------
dates: pd.Series[datetime]
Series of dates for which CSV files were exported.
"""
if not custom_run:
# Label the file with today's date (the date the data was fetched).
if not issue:
issue = datetime.today().strftime("%Y%m%d")

backup_filename = [issue, geo_res, metric, sensor]
backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz"
backup_file = join(backup_dir, backup_filename)

with gzip.open(backup_file, "wt", newline="") as f:
df.to_csv(f, index=False, na_rep="NA")

if logger:
Copy link
Contributor

@aysim319 aysim319 Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is logger optional? we want to keep track if backup was created or not right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is also copied from create_export_csv.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also originally pull method doesn't take logger as a variable at all, so I wasn' sure if we should force it to within the scope of this PR.

logger.info(
"Backup file created",
backup_file=backup_file,
backup_size=getsize(backup_file),
)
1 change: 1 addition & 0 deletions ansible/templates/nchs_mortality-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"common": {
"daily_export_dir": "./daily_receiving",
"backup_dir": "./raw_data_backups",
"log_filename": "/var/log/indicators/nchs_mortality.log",
"weekly_export_dir": "/common/covidcast/receiving/nchs-mortality"
},
Expand Down
22 changes: 19 additions & 3 deletions nchs_mortality/delphi_nchs_mortality/pull.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NCHS mortality data API."""

import logging
from typing import Optional

import numpy as np
import pandas as pd
from delphi_utils import create_backup_csv
from delphi_utils.geomap import GeoMapper
from sodapy import Socrata

from delphi_utils.geomap import GeoMapper
from .constants import METRICS, NEWLINE, RENAME

from .constants import METRICS, RENAME, NEWLINE

def standardize_columns(df):
"""Rename columns to comply with a standard set.
Expand All @@ -22,7 +24,13 @@ def standardize_columns(df):
return df.rename(columns=dict(rename_pairs))


def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None):
def pull_nchs_mortality_data(
socrata_token: str,
backup_dir: str,
custom_run: bool,
logger: Optional[logging.Logger] = None,
test_file: Optional[str] = None,
):
"""Pull the latest NCHS Mortality data, and conforms it into a dataset.

The output dataset has:
Expand All @@ -40,6 +48,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None
----------
socrata_token: str
My App Token for pulling the NCHS mortality data
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
test_file: Optional[str]
When not null, name of file from which to read test data

Expand All @@ -60,6 +72,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None
client = Socrata("data.cdc.gov", socrata_token)
results = client.get("r8kw-7aab", limit=10**10)
df = pd.DataFrame.from_records(results)

create_backup_csv(df, backup_dir, custom_run=custom_run, logger=logger)

if not test_file:
# drop "By Total" rows
df = df[df["group"].transform(str.lower) == "by week"]

Expand Down
6 changes: 5 additions & 1 deletion nchs_mortality/delphi_nchs_mortality/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def run_module(params: Dict[str, Any]):
days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime('%Y-%m-%d')
daily_export_dir = params["common"]["daily_export_dir"]
backup_dir = params["common"]["backup_dir"]
custom_run = params["common"].get("custom_run", False)
socrata_token = params["indicator"]["socrata_token"]
test_file = params["indicator"].get("test_file", None)

Expand All @@ -70,7 +72,9 @@ def run_module(params: Dict[str, Any]):
daily_arch_diff.update_cache()

stats = []
df_pull = pull_nchs_mortality_data(socrata_token, test_file)
df_pull = pull_nchs_mortality_data(
socrata_token, backup_dir, custom_run=custom_run, test_file=test_file, logger=logger
)
for metric in METRICS:
for geo in ["state", "nation"]:
if metric == 'percent_of_expected_deaths':
Expand Down
3 changes: 2 additions & 1 deletion nchs_mortality/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"common": {
"daily_export_dir": "./daily_receiving",
"weekly_export_dir": "./receiving",
"log_filename": "/var/log/indicators/nchs_mortality.log",
"backup_dir": "./raw_data_backups",
"log_filename": "./nchs_mortality.log",
"log_exceptions": false
},
"indicator": {
Expand Down
120 changes: 120 additions & 0 deletions nchs_mortality/raw_data_backups/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# You should hard commit a prototype for this file, but we
# want to avoid accidental adding of API tokens and other
# private data parameters
params.json

# Do not commit output files
receiving/*.csv

# Remove macOS files
.DS_Store

# virtual environment
dview/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
coverage.xml
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
.static_storage/
.media/
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
4 changes: 3 additions & 1 deletion nchs_mortality/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

PARAMS = {
"common": {
"custom_run": True,
"daily_export_dir": "./daily_receiving",
"weekly_export_dir": "./receiving"
"weekly_export_dir": "./receiving",
"backup_dir": "./raw_data_backups"
},
"indicator": {
"export_start_date": "2020-04-11",
Expand Down
2 changes: 2 additions & 0 deletions nchs_mortality/tests/raw_data_backups/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.csv
*.gz
18 changes: 15 additions & 3 deletions nchs_mortality/tests/test_pull.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import pytest

import pandas as pd
Expand Down Expand Up @@ -34,7 +35,7 @@ def test_standardize_columns(self):
pd.testing.assert_frame_equal(expected, df)

def test_good_file(self):
df = pull_nchs_mortality_data(SOCRATA_TOKEN, "test_data.csv")
df = pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "test_data.csv")

# Test columns
assert (
Expand Down Expand Up @@ -90,9 +91,20 @@ def test_good_file(self):
def test_bad_file_with_inconsistent_time_col(self):
with pytest.raises(ValueError):
pull_nchs_mortality_data(
SOCRATA_TOKEN, "bad_data_with_inconsistent_time_col.csv"
SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_inconsistent_time_col.csv"
)

def test_bad_file_with_missing_cols(self):
with pytest.raises(ValueError):
pull_nchs_mortality_data(SOCRATA_TOKEN, "bad_data_with_missing_cols.csv")
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv")

def test_backup_today_data(self):
today = pd.Timestamp.today().strftime("%Y%m%d")
backup_dir = "./raw_data_backups"
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv")
backup_file = f"{backup_dir}/{today}.csv.gz"
backup_df = pd.read_csv(backup_file)
source_df = pd.read_csv("test_data/test_data.csv")
pd.testing.assert_frame_equal(source_df, backup_df)
if os.path.exists(backup_file):
os.remove(backup_file)
Loading