Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
911c0e0
Added archiving diffing utility
eujing Aug 12, 2020
5d3b8ec
Updated unit tests
eujing Aug 12, 2020
2d3fb80
Updated ansible template
eujing Aug 12, 2020
f1cace3
Merge pull request #235 from cmu-delphi/main
krivard Aug 27, 2020
d8c7019
Merge pull request #234 from cmu-delphi/diff-uploads-usafacts
krivard Aug 31, 2020
4debc31
fix broken usafacts tests to read from the proper directories
sgsmob Oct 15, 2020
06e4b12
replace deprecated functions
chinandrew Oct 16, 2020
f6e7b78
Remove uppercase
chinandrew Oct 16, 2020
a164e25
Remove print
chinandrew Oct 16, 2020
021cbb4
Remove zips static file
chinandrew Oct 16, 2020
707535e
Fix argument
chinandrew Oct 17, 2020
8091221
Add a gap detector to Sir Complainsalot
capnrefsmmat Oct 17, 2020
849263c
Formatting fix
capnrefsmmat Oct 17, 2020
39df546
Add hospital admissions, USAFacts to Sir Complainsalot
capnrefsmmat Oct 17, 2020
51c0c03
make new receiving directory in test directory
sgsmob Oct 19, 2020
0783ebd
Merge pull request #327 from cmu-delphi/sir-gapdetector
krivard Oct 19, 2020
bb8e17e
Merge pull request #331 from cmu-delphi/deploy-safegraph
krivard Oct 20, 2020
0115538
Issue template for feature release
krivard Oct 20, 2020
44be069
Change auto-assign of release tasks
krivard Oct 20, 2020
a1a1b50
Temporarily skip linting in Jenkins
korlaxxalrok Oct 20, 2020
b0bb289
Merge pull request #314 from sgsmob/fix_usa_tests
krivard Oct 20, 2020
317a4d5
Merge pull request #334 from cmu-delphi/deploy-usafacts
krivard Oct 20, 2020
b21252f
Merge pull request #322 from cmu-delphi/geo_refactor_claimshosp
krivard Oct 20, 2020
6b96a1a
migrate safegraph.run back onto using functools.partial
sgsmob Oct 21, 2020
933c6b4
fix output file format from YYYY-MM-DD to YYYYMMDD
sgsmob Oct 21, 2020
3d0e5c5
remove wildcard imports from test
chinandrew Oct 21, 2020
12aac02
Merge pull request #349 from cmu-delphi/fix-test-imports
krivard Oct 21, 2020
117bc9f
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/ISSUE_TEMPLATE/feature_release.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
---
name: Feature release
about: Begin the finishing work for features ready to be included in a release
title: 'Release NEW_THING'
labels: 'release'
assignees: 'benjaminysmith'
---

- [Link to issue]()
- [Link to PR]()
- Proposed release version: <!-- eg 1.12 -->

<!-- Additional information about the feature: -->


<!-- relevant for most work -->

- [ ] API [documentation](https://github.com/cmu-delphi/delphi-epidata/tree/main/docs/api) and/or [changelog](https://github.com/cmu-delphi/delphi-epidata/blob/main/docs/api/covidcast_changelog.md)
- [ ] API mailing list notification

<!-- relevant for new signals -->

- [ ] Statistical review (usually [correlations](https://github.com/cmu-delphi/covidcast/tree/main/docs/R-notebooks))
- [ ] Signal / source name review (usually [Roni](https://docs.google.com/document/d/10hGd4Evce4lJ4VkWaQEKFQxvmw2P4xyYGtIAWF52Sf8/edit?usp=sharing))

<!-- relevant for new map signals -->

- [ ] Visual review
- [ ] [Signal description pop-up text](https://docs.google.com/document/d/1kDqRg8EaI4WQXMaUUbbCGPlsUqEql8kgXCNt6AvMA9I/edit?usp=sharing) review
- [ ] [Map release notes](https://docs.google.com/document/d/1BpxGgIma_Lkd2kxtwEo2DBdHQ3zk6dHRz-leUIRlOIA/edit?usp=sharing)
7 changes: 0 additions & 7 deletions ansible/files/usafacts-params-prod.json

This file was deleted.

12 changes: 12 additions & 0 deletions ansible/templates/usafacts-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"export_start_date": "latest",
"static_file_dir": "./static",
"export_dir": "/common/covidcast/receiving/usa-facts",
"cache_dir": "./cache",
"base_url": "https://usafactsstatic.blob.core.windows.net/public/data/covid-19/covid_{metric}_usafacts.csv",
"aws_credentials": {
"aws_access_key_id": "{{ delphi_aws_access_key_id }}",
"aws_secret_access_key": "{{ delphi_aws_secret_access_key }}"
},
"bucket_name": "delphi-covidcast-indicator-output"
}
2 changes: 1 addition & 1 deletion cdc_covidnet/tests/test_handle_wip_signal.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest
from delphi_cdc_covidnet.update_sensor import add_prefix
from delphi_cdc_covidnet.constants import *
from delphi_cdc_covidnet.constants import SIGNALS

def test_handle_wip_signal():
# Test wip_signal = True, add prefix to all signals
Expand Down
19 changes: 13 additions & 6 deletions claims_hosp/delphi_claims_hosp/update_indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,21 @@ def geo_reindex(self, data):
"""
geo_map = GeoMapper()
if self.geo == "county":
data_frame = geo_map.county_to_megacounty(
data, Config.MIN_DEN, Config.MAX_BACKWARDS_PAD_LENGTH,
thr_col="den", mega_col=self.geo)
data_frame = geo_map.fips_to_megacounty(data,
Config.MIN_DEN,
Config.MAX_BACKWARDS_PAD_LENGTH,
thr_col="den",
mega_col=self.geo)
elif self.geo == "state":
data_frame = geo_map.county_to_state(data, state_id_col=self.geo)
data_frame = geo_map.replace_geocode(data,
from_code="fips",
new_col=self.geo,
new_code="state_id")
data_frame[self.geo] = data_frame[self.geo]
elif self.geo == "msa":
data_frame = geo_map.county_to_msa(data, msa_col=self.geo)
data_frame = geo_map.replace_geocode(data,
from_code="fips",
new_code=self.geo)
elif self.geo == "hrr":
data_frame = data # data is already adjusted in aggregation step above
else:
Expand All @@ -119,7 +127,6 @@ def geo_reindex(self, data):
assert (
len(multiindex) <= (GeoConstants.MAX_GEO[self.geo] * len(self.fit_dates))
), "more loc-date pairs than maximum number of geographies x number of dates"

# fill dataframe with missing dates using 0
data_frame = data_frame.reindex(multiindex, fill_value=0)
data_frame.fillna(0, inplace=True)
Expand Down
33,100 changes: 0 additions & 33,100 deletions claims_hosp/static/02_20_uszips.csv

This file was deleted.

2 changes: 1 addition & 1 deletion claims_hosp/tests/test_load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# first party
from delphi_claims_hosp.config import Config, GeoConstants
from delphi_claims_hosp.load_data import *
from delphi_claims_hosp.load_data import load_data, load_claims_data
from delphi_utils import read_params

CONFIG = Config()
Expand Down
1 change: 0 additions & 1 deletion claims_hosp/tests/test_update_indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

# first party
from delphi_claims_hosp.config import Config, GeoConstants
from delphi_claims_hosp.load_data import *
from delphi_claims_hosp.update_indicator import ClaimsHospIndicatorUpdater

CONFIG = Config()
Expand Down
4 changes: 3 additions & 1 deletion jenkins/usafacts-jenkins-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ local_indicator="usafacts"
cd "${WORKSPACE}/${local_indicator}" || exit

# Linter
env/bin/pylint delphi_"${local_indicator}"
#env/bin/pylint delphi_"${local_indicator}"
echo "Skip linting because we have weird breakage :( \
TODO: https://github.com/cmu-delphi/covidcast-indicators/issues/333"

# Unit tests and code coverage
cd tests || exit && \
Expand Down
31 changes: 16 additions & 15 deletions safegraph/delphi_safegraph/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# Base file name for raw data CSVs.
CSV_NAME = 'social-distancing.csv.gz'


def validate(df):
"""Confirms that a data frame has only one date."""
timestamps = df['date_range_start'].apply(date_from_timestamp)
Expand Down Expand Up @@ -235,13 +236,13 @@ def process_window(df_list: List[pd.DataFrame],
f'{signal}_se': 'se',
f'{signal}_n': 'sample_size',
}, axis=1)
df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv',
date_str = date.strftime('%Y%m%d')
df_export.to_csv(f'{export_dir}/{date_str}_{geo_res}_{signal}.csv',
na_rep='NA',
index=False, )


def process(current_filename: str,
previous_filenames: List[str],
def process(filenames: List[str],
signal_names: List[str],
wip_signal,
geo_resolutions: List[str],
Expand All @@ -250,11 +251,11 @@ def process(current_filename: str,
as averaged over the previous week.
Parameters
----------
current_filename: str
path to file holding the target date's data.
previous_filenames: List[str]
paths to files holding data from each day in the week preceding the
target date.
current_filename: List[str]
paths to files holding data.
The first entry of the list should correspond to the target date while
the remaining entries should correspond to the dates from each day in
the week preceding the target date.
signal_names: List[str]
signal names to be processed for a single date.
A second version of each such signal named {SIGNAL}_7d_avg will be
Expand All @@ -274,8 +275,8 @@ def process(current_filename: str,
one for the data averaged over the previous week to
{export_dir}/{date}_{resolution}_{signal}_7d_avg.csv.
"""
past_week = [pd.read_csv(current_filename)]
for fname in previous_filenames:
past_week = []
for fname in filenames:
if os.path.exists(fname):
past_week.append(pd.read_csv(fname))

Expand All @@ -286,8 +287,8 @@ def process(current_filename: str,
export_dir)
# ...then as part of the whole window.
process_window(past_week,
add_prefix(add_suffix(signal_names, '_7d_avg'),
wip_signal,
'wip_'),
geo_resolutions,
export_dir)
add_prefix(add_suffix(signal_names, '_7d_avg'),
wip_signal,
'wip_'),
geo_resolutions,
export_dir)
32 changes: 15 additions & 17 deletions safegraph/delphi_safegraph/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
when the module is run with `python -m MODULE_NAME`.
"""
import glob
import functools
import multiprocessing as mp
import subprocess

Expand All @@ -24,22 +25,13 @@ def run_module():
aws_endpoint = params["aws_endpoint"]
wip_signal = params["wip_signal"]

def process_file(current_filename):
"""Wrapper around `process()` that only takes a single argument.

A single argument function is necessary to use `pool.map()` below.
Because each call to `process()` has two arguments that are dependent
on the input file name (`current_filename` and `previous_filenames`),
we choose to use this wrapper rather than something like
`functools.partial()`.
"""
return process(current_filename,
files_in_past_week(current_filename),
signal_names=SIGNALS,
wip_signal=wip_signal,
geo_resolutions=GEO_RESOLUTIONS,
export_dir=export_dir,
)
single_arg_process = functools.partial(
process,
signal_names=SIGNALS,
wip_signal=wip_signal,
geo_resolutions=GEO_RESOLUTIONS,
export_dir=export_dir,
)

# Update raw data
# Why call subprocess rather than using a native Python client, e.g. boto3?
Expand All @@ -60,5 +52,11 @@ def process_file(current_filename):
files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz',
recursive=True)

files_with_previous_weeks = []
for fname in files:
previous_week = [fname]
previous_week.extend(files_in_past_week(fname))
files_with_previous_weeks.append(previous_week)

with mp.Pool(n_core) as pool:
pool.map(process_file, files)
pool.map(single_arg_process, files_with_previous_weeks)
12 changes: 6 additions & 6 deletions safegraph/tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def test_process_window(self, tmp_path):
'sample_size': [2, 2]
})
actual = pd.read_csv(
export_dir / '2020-02-14_county_completely_home_prop.csv')
export_dir / '20200214_county_completely_home_prop.csv')
pd.testing.assert_frame_equal(expected, actual)

def test_process(self, tmp_path):
Expand All @@ -137,11 +137,11 @@ def test_process(self, tmp_path):
export_dir = tmp_path / 'export'
export_dir.mkdir()

process('raw_data/small_raw_data_0.csv',
# File 2 does not exist.
['raw_data/small_raw_data_1.csv',
process(['raw_data/small_raw_data_0.csv',
'raw_data/small_raw_data_1.csv',
# File 2 does not exist.
'raw_data/small_raw_data_2.csv',
'raw_data/small_raw_data_3.csv', ],
'raw_data/small_raw_data_3.csv'],
SIGNALS,
['median_home_dwell_time',
'completely_home_prop_7d_avg'],
Expand Down Expand Up @@ -199,7 +199,7 @@ def test_process(self, tmp_path):
})
}
actual = {signal: pd.read_csv(
export_dir / f'2020-06-12_state_{signal}.csv')
export_dir / f'20200612_state_{signal}.csv')
for signal in expected}
for signal in expected:
pd.testing.assert_frame_equal(expected[signal], actual[signal])
68 changes: 62 additions & 6 deletions sir_complainsalot/delphi_sir_complainsalot/check_source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from dataclasses import dataclass
from typing import List

import covidcast
import numpy as np
import pandas as pd

@dataclass
Expand All @@ -27,33 +29,87 @@ def to_md(self):
message=self.message, updated=self.last_updated.strftime("%Y-%m-%d"))

def check_source(data_source, meta, params, grace):
"""Iterate over all signals from a source and check if they exceed max age."""
"""Iterate over all signals from a source and check for problems.

Possible problems:

- Newest available data exceeds max age.
- Gap between subsequent data points exceeds max gap.

For example, consider a source with a max age of 5 days and max gap of 1
day. If today is 2020-10-15, and the latest available data is from
2020-10-09, the max age is exceeded. If there is no data available on
2020-10-07, but there is on 2020-10-06 and 2020-10-08, there is a gap of 2
days and the max gap is exceeded.

The gap window controls how much data we check for gaps -- a gap window of
10 days means we check the most recent 10 days of data. Defaults to 7.

"""

source_config = params[data_source]
gap_window = pd.Timedelta(days=source_config.get("gap_window", 7))
max_allowed_gap = source_config.get("max_gap", 1)

signals = meta[meta.data_source == data_source]

now = pd.Timestamp.now()

complaints = {}
age_complaints = {}
gap_complaints = {}

for _, row in signals.iterrows():
if "retired-signals" in source_config and \
row["signal"] in source_config["retired-signals"]:
continue

# Check max age
age = (now - row["max_time"]).days

if age > source_config["max_age"] + grace:
if row["signal"] not in complaints:
complaints[row["signal"]] = Complaint(
if row["signal"] not in age_complaints:
age_complaints[row["signal"]] = Complaint(
"is more than {age} days old".format(age=age),
data_source,
row["signal"],
[row["geo_type"]],
row["max_time"],
source_config["maintainers"])
else:
complaints[row["signal"]].geo_types.append(row["geo_type"])
age_complaints[row["signal"]].geo_types.append(row["geo_type"])

# Check max gap
if max_allowed_gap == -1:
# No gap detection for this source
continue

latest_data = covidcast.signal(
data_source, row["signal"],
start_day=row["max_time"] - gap_window,
end_day=row["max_time"],
geo_type=row["geo_type"]
)

# convert numpy datetime values to pandas datetimes and then to
# datetime.date, so we can work with timedeltas after
unique_dates = [pd.to_datetime(val).date()
for val in latest_data["time_value"].unique()]

gap_days = [(day - prev_day).days
for day, prev_day in zip(unique_dates[1:], unique_dates[:-1])]
gap = max(gap_days)

if gap > max_allowed_gap:
if row["signal"] not in gap_complaints:
gap_complaints[row["signal"]] = Complaint(
"has a {gap}-day gap of missing data in its most recent "
"{gap_window} days of data".format(gap=gap, gap_window=gap_window.days),
data_source,
row["signal"],
[row["geo_type"]],
row["max_time"],
source_config["maintainers"])
else:
gap_complaints[row["signal"]].geo_types.append(row["geo_type"])

return list(complaints.values())
return list(age_complaints.values()) + list(gap_complaints.values())
2 changes: 1 addition & 1 deletion sir_complainsalot/delphi_sir_complainsalot/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def run_module():

complaints = []
for data_source in params["sources"].keys():
complaints.extend(check_source(data_source, meta, params["sources"], params.get("grace",0)))
complaints.extend(check_source(data_source, meta, params["sources"], params.get("grace", 0)))

if len(complaints) > 0:
for complaint in complaints:
Expand Down
Loading