Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.56
current_version = 0.3.57
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
2 changes: 1 addition & 1 deletion .github/workflows/create-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
bump2version --list ${{ github.event.inputs.versionName }} | grep ^new_version | sed -r s,"^.*=",,
- name: Copy version to indicator directory
run: |
indicator_list=("changehc" "claims_hosp" "doctor_visits" "google_symptoms" "hhs_hosp" "nchs_mortality" "quidel_covidtest" "sir_complainsalot")
indicator_list=("changehc" "claims_hosp" "doctor_visits" "google_symptoms" "hhs_hosp" "nchs_mortality" "nssp" "quidel_covidtest" "sir_complainsalot")
for path in ${indicator_list[@]}; do
echo "current_version = ${{ steps.indicators.outputs.version }}" > $path/version.cfg
done
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ jobs:
dir: "delphi_quidel_covidtest"
- package: "sir_complainsalot"
dir: "delphi_sir_complainsalot"
- package: "nhsn"
dir: "delphi_nhsn"
defaults:
run:
working-directory: ${{ matrix.package }}
Expand Down
6 changes: 5 additions & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
- TODO: #527 Get this list automatically from python-ci.yml at runtime.
*/

def indicator_list = ['backfill_corrections', 'changehc', 'claims_hosp', 'google_symptoms', 'hhs_hosp', 'nchs_mortality', 'quidel_covidtest', 'sir_complainsalot', 'doctor_visits', 'nwss_wastewater', 'nssp']
def indicator_list = ['backfill_corrections', 'changehc', 'claims_hosp', 'google_symptoms', 'hhs_hosp', 'nchs_mortality', 'quidel_covidtest', 'sir_complainsalot', 'doctor_visits', 'nwss_wastewater', 'nssp', 'nhsn']
def build_package_main = [:]
def build_package_prod = [:]
def deploy_staging = [:]
def deploy_production = [:]

pipeline {
agent any
environment {
// Set the PATH variable to include the pyenv shims directory.
PATH = "/var/lib/jenkins/.pyenv/shims:${env.PATH}"
}
stages {
stage('Build dev/feature branch') {
when {
Expand Down
2 changes: 1 addition & 1 deletion _delphi_utils_python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.25
current_version = 0.3.26
commit = True
message = chore: bump delphi_utils to {new_version}
tag = False
Expand Down
11 changes: 10 additions & 1 deletion _delphi_utils_python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@ Source code can be found here:

## Logger Usage

To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic, the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier), and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument to the logger call (for use in filtering, thresholding, grouping, visualization, etc).

### Commonly used argument names:
- data_source
- geo_type
- signal
- issue_date
- filename

Single-thread usage.

```py
from delphi_utils.logger import get_structured_logger

logger = get_structured_logger('my_logger')
logger.info('Hello, world!')
logger.info('Hello', name='World')
```

Multi-thread usage.
Expand Down
15 changes: 7 additions & 8 deletions _delphi_utils_python/delphi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
from __future__ import absolute_import

from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer
from .export import create_export_csv
from .utils import read_params

from .slack_notifier import SlackNotifier
from .logger import get_structured_logger
from .export import create_backup_csv, create_export_csv
from .geomap import GeoMapper
from .smooth import Smoother
from .signal import add_prefix
from .logger import get_structured_logger
from .nancodes import Nans
from .signal import add_prefix
from .slack_notifier import SlackNotifier
from .smooth import Smoother
from .utils import read_params
from .weekday import Weekday

__version__ = "0.3.25"
__version__ = "0.3.26"
86 changes: 81 additions & 5 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
"""Export data in the format expected by the Delphi API."""
# -*- coding: utf-8 -*-
import logging
from datetime import datetime
from os.path import join
from os.path import getsize, join
from typing import Optional
import logging

from epiweeks import Week
import numpy as np
import pandas as pd
from epiweeks import Week

from .nancodes import Nans


def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
Expand All @@ -22,8 +23,10 @@ def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
"Filtering contradictory missing code",
sensor=sensor,
metric=metric,
date=date.strftime(format="%Y-%m-%d"),
)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
Expand Down Expand Up @@ -130,3 +133,76 @@ def create_export_csv(
export_df = export_df.sort_values(by="geo_id")
export_df.to_csv(export_file, index=False, na_rep="NA")
return dates


def create_backup_csv(
df: pd.DataFrame,
backup_dir: str,
custom_run: bool,
issue: Optional[str] = None,
geo_res: Optional[str] = None,
sensor: Optional[str] = None,
metric: Optional[str] = None,
logger: Optional[logging.Logger] = None,
):
"""Save data for use as a backup.

This function is meant to save raw data fetched from data sources.
Therefore, it avoids manipulating the data as much as possible to
preserve the input.

When only required arguments are passed, data will be saved to a file of
the format `<export_dir>/<today's date as YYYYMMDD>.csv`. Optional arguments
should be passed if the source data is fetched from different tables or
in batches by signal, geo, etc.

Parameters
----------
df: pd.DataFrame
Columns: geo_id, timestamp, val, se, sample_size
backup_dir: str
Backup directory
custom_run: bool
Flag indicating if the current run is a patch, or other run where
backups aren't needed. If so, don't save any data to disk
issue: Optional[str]
The date the data was fetched, in YYYYMMDD format. Defaults to "today"
if not provided
geo_res: Optional[str]
Geographic resolution of the data
sensor: Optional[str]
Sensor that has been calculated (cumulative_counts vs new_counts)
metric: Optional[str]
Metric we are considering, if any.
logger: Optional[logging.Logger]
Pass a logger object here to log information about name and size of the backup file.

Returns
---------
dates: pd.Series[datetime]
Series of dates for which CSV files were exported.
"""
if not custom_run:
# Label the file with today's date (the date the data was fetched).
if not issue:
issue = datetime.today().strftime("%Y%m%d")

backup_filename = [issue, geo_res, metric, sensor]
backup_filename = "_".join(filter(None, backup_filename))
backup_file = join(backup_dir, backup_filename)
try:
# defacto data format is csv, but parquet preserved data types (keeping both as intermidary measures)
df.to_csv(
f"{backup_file}.csv.gz", index=False, na_rep="NA", compression="gzip"
)
df.to_parquet(f"{backup_file}.parquet", index=False)

if logger:
logger.info(
"Backup file created",
backup_file=backup_file,
backup_size=getsize(f"{backup_file}.csv.gz"),
)
# pylint: disable=W0703
except Exception as e:
logger.info("Backup file creation failed", msg=e)
3 changes: 1 addition & 2 deletions _delphi_utils_python/delphi_utils/flash_eval/eval_day.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ def output(evd_ranking, day, lag, signal, logger):
p_text += f"\t{start_link}|*{index}*, {'{:.2f}'.format(value)}>\n"
else:
break
name = f"Signal: {signal} Lag: {lag}"
logger.info(name, payload=p_text)
logger.info("FLaSH: worth inspecting", signal=signal, lag=lag, payload=p_text)


def evd_ranking_fn(ts_streams, EVD_max, EVD_min):
Expand Down
10 changes: 5 additions & 5 deletions _delphi_utils_python/delphi_utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Structured logger utility for creating JSON logs.

See the delphi_utils README.md for usage examples.
To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic,
the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier),
and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument
to the logger call (for use in filtering, thresholding, grouping, visualization, etc)

The Delphi group uses two ~identical versions of this file.
Try to keep them in sync with edits, for sanity.
https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py
https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py
See the delphi_utils README.md for usage examples.
"""

import contextlib
Expand Down
19 changes: 14 additions & 5 deletions _delphi_utils_python/delphi_utils/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],

#Get version and indicator name for startup
ind_name = indicator_fn.__module__.replace(".run", "")

#Check for version.cfg in indicator directory
if os.path.exists("version.cfg"):
with open("version.cfg") as ver_file:
Expand All @@ -59,9 +60,15 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
if "current_version" in line:
current_version = str.strip(line)
current_version = current_version.replace("current_version = ", "")
#Logging - Starting Indicator
logger.info(f"Started {ind_name} with covidcast-indicators version {current_version}")
else: logger.info(f"Started {ind_name} without version.cfg")
logger.info(
"Started a covidcast-indicator",
indicator_name=ind_name,
current_version=current_version,
)
else:
logger.info(
"Started a covidcast-indicator without version.cfg", indicator_name=ind_name
)

indicator_fn(params)
validator = validator_fn(params)
Expand All @@ -77,8 +84,10 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
break
time.sleep(1)
else:
logger.error(f"Flash step timed out ({timer} s), terminating",
elapsed_time_in_seconds = round(time.time() - start, 2))
logger.error(
"Flash step timed out, terminating",
elapsed_time_in_seconds=round(time.time() - start, 2),
)
t1.terminate()
t1.join()
if validator:
Expand Down
5 changes: 3 additions & 2 deletions _delphi_utils_python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "delphi-utils"
version = "0.3.25"
version = "0.3.26"
description = "Shared Utility Functions for Indicators"
readme = "README.md"
requires-python = "== 3.8.*"
Expand All @@ -13,7 +13,7 @@ classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.8",
"License :: MIT",
"License :: OSI Approved :: MIT License",
]
dependencies = [
"boto3",
Expand All @@ -23,6 +23,7 @@ dependencies = [
"gitpython",
"importlib_resources>=1.3",
"numpy",
"pyarrow",
"pandas>=1.1.0",
"requests",
"slackclient",
Expand Down
34 changes: 28 additions & 6 deletions _delphi_utils_python/tests/test_export.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Tests for exporting CSV files."""
from datetime import datetime
import logging
from os import listdir
from os.path import join
from typing import Any, Dict, List
from typing import Any, Dict

import mock
import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal

from delphi_utils import create_export_csv, Nans
from delphi_utils import create_export_csv, Nans, create_backup_csv, get_structured_logger


def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame:
Expand Down Expand Up @@ -323,12 +324,13 @@ def test_export_df_with_missingness(self, tmp_path):

@mock.patch("delphi_utils.logger")
def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):

sensor = "test"
geo_res = "state"
create_export_csv(
df=self.DF3.copy(),
export_dir=tmp_path,
geo_res="state",
sensor="test",
sensor=sensor,
geo_res=geo_res,
logger=mock_logger
)
assert set(listdir(tmp_path)) == set(
Expand All @@ -339,8 +341,9 @@ def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):
]
)
assert pd.read_csv(join(tmp_path, "20200315_state_test.csv")).size > 0
date_str = datetime.strftime(self.TIMES[0], "%Y-%m-%d")
mock_logger.info.assert_called_once_with(
"Filtering contradictory missing code in test_None_2020-02-15."
"Filtering contradictory missing code", sensor=sensor, metric=None, date=date_str
)

def test_export_sort(self, tmp_path):
Expand Down Expand Up @@ -384,3 +387,22 @@ def test_export_sort(self, tmp_path):
})
sorted_csv = _set_df_dtypes(pd.read_csv(join(tmp_path, "20200215_county_test.csv")), dtypes={"geo_id": str})
assert_frame_equal(sorted_csv,expected_df)

def test_create_backup_regular(self, caplog, tmp_path):
caplog.set_level(logging.INFO)
logger = get_structured_logger()
today = datetime.strftime(datetime.today(), "%Y%m%d")
dtypes = self.DF.dtypes.to_dict()
del dtypes["timestamp"]
geo_res = "county"
metric = "test"
sensor = "deaths"
create_backup_csv(df=self.DF, backup_dir=tmp_path, custom_run=False, issue=None, geo_res=geo_res, metric=metric, sensor=sensor, logger=logger)
assert "Backup file created" in caplog.text

actual = pd.read_csv(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.csv.gz"), dtype=dtypes, parse_dates=["timestamp"])
assert self.DF.equals(actual)

actual_parquet = pd.read_parquet(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.parquet"))
assert actual_parquet.equals(actual)

2 changes: 1 addition & 1 deletion _template_python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.8",
"License :: MIT",
"License :: OSI Approved :: MIT License",
]
dependencies = [
"delphi-utils",
Expand Down
1 change: 1 addition & 0 deletions ansible/templates/nchs_mortality-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"common": {
"daily_export_dir": "./daily_receiving",
"backup_dir": "./raw_data_backups",
"log_filename": "/var/log/indicators/nchs_mortality.log",
"weekly_export_dir": "/common/covidcast/receiving/nchs-mortality"
},
Expand Down
Loading