Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ def test_caching(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
123, 1, 2, 3, 456, 1)
123, 1, 2, 3, 456, 1, 20200422, 0)
''')
self.cur.execute('''
insert into covidcast values
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
456, 4, 5, 6, 789, -1)
456, 4, 5, 6, 789, -1, 20200422, 0)
''')

self.cnx.commit()
Expand Down
18 changes: 9 additions & 9 deletions integrations/acquisition/covidcast/test_direction_updating.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,23 @@ def test_uploading(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
123, 2, 0, 0, 0, NULL),
123, 2, 0, 0, 0, NULL, 20200228, 0),
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
123, 6, 0, 0, 0, NULL),
123, 6, 0, 0, 0, NULL, 20200229, 0),
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
123, 5, 0, 0, 0, NULL),
123, 5, 0, 0, 0, NULL, 20200301, 0),
(0, 'src', 'sig', 'day', 'state', 20200511, 'fl',
123, 1, 0, 0, 0, NULL),
123, 1, 0, 0, 0, NULL, 20200511, 0),
(0, 'src', 'sig', 'day', 'state', 20200512, 'fl',
123, 2, 0, 0, 0, NULL),
123, 2, 0, 0, 0, NULL, 20200512, 0),
(0, 'src', 'sig', 'day', 'state', 20200517, 'fl',
123, 2, 0, 0, 0, NULL),
123, 2, 0, 0, 0, NULL, 20200517, 0),
(0, 'src', 'sig', 'day', 'state', 20200615, 'tx',
123, 9, 0, 0, 456, NULL),
123, 9, 0, 0, 456, NULL, 20200615, 0),
(0, 'src', 'sig', 'day', 'state', 20200616, 'tx',
123, 5, 0, 0, 456, NULL),
123, 5, 0, 0, 456, NULL, 20200616, 0),
(0, 'src', 'sig', 'day', 'state', 20200617, 'tx',
123, 1, 0, 0, 456, 1)
123, 1, 0, 0, 456, 1, 20200617, 0)
''')
self.cnx.commit()

Expand Down
4 changes: 2 additions & 2 deletions integrations/client/test_delphi_epidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_covidcast(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 1.5, 2.5, 3.5, 456, 4)
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0)
''')
self.cnx.commit()

Expand Down Expand Up @@ -78,7 +78,7 @@ def test_covidcast_meta(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 1.5, 2.5, 3.5, 456, 4)
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0)
''')
self.cnx.commit()

Expand Down
49 changes: 28 additions & 21 deletions integrations/server/test_covidcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_round_trip(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 1.5, 2.5, 3.5, 456, 4)
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0)
''')
self.cnx.commit()

Expand Down Expand Up @@ -83,17 +83,17 @@ def test_location_wildcard(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '11111',
123, 10, 11, 12, 456, 13),
123, 10, 11, 12, 456, 13, 20200414, 0),
(0, 'src', 'sig', 'day', 'county', 20200414, '22222',
123, 20, 21, 22, 456, 23),
123, 20, 21, 22, 456, 23, 20200414, 0),
(0, 'src', 'sig', 'day', 'county', 20200414, '33333',
123, 30, 31, 32, 456, 33),
123, 30, 31, 32, 456, 33, 20200414, 0),
(0, 'src', 'sig', 'day', 'msa', 20200414, '11111',
123, 40, 41, 42, 456, 43),
123, 40, 41, 42, 456, 43, 20200414, 0),
(0, 'src', 'sig', 'day', 'msa', 20200414, '22222',
123, 50, 51, 52, 456, 53),
123, 50, 51, 52, 456, 53, 20200414, 0),
(0, 'src', 'sig', 'day', 'msa', 20200414, '33333',
123, 60, 61, 62, 456, 634)
123, 60, 61, 62, 456, 634, 20200414, 0)
''')
self.cnx.commit()

Expand Down Expand Up @@ -147,17 +147,17 @@ def test_location_timeline(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200411, '01234',
123, 10, 11, 12, 456, 13),
123, 10, 11, 12, 456, 13, 20200413, 2),
(0, 'src', 'sig', 'day', 'county', 20200412, '01234',
123, 20, 21, 22, 456, 23),
123, 20, 21, 22, 456, 23, 20200413, 1),
(0, 'src', 'sig', 'day', 'county', 20200413, '01234',
123, 30, 31, 32, 456, 33),
123, 30, 31, 32, 456, 33, 20200413, 0),
(0, 'src', 'sig', 'day', 'county', 20200411, '11111',
123, 40, 41, 42, 456, 43),
123, 40, 41, 42, 456, 43, 20200413, 2),
(0, 'src', 'sig', 'day', 'county', 20200412, '22222',
123, 50, 51, 52, 456, 53),
123, 50, 51, 52, 456, 53, 20200413, 1),
(0, 'src', 'sig', 'day', 'county', 20200413, '33333',
123, 60, 61, 62, 456, 63)
123, 60, 61, 62, 456, 63, 20200413, 0)
''')
self.cnx.commit()

Expand Down Expand Up @@ -211,7 +211,7 @@ def test_unique_key_constraint(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
0, 0, 0, 0, 0, 0)
0, 0, 0, 0, 0, 0, 20200414, 0)
''')
self.cnx.commit()

Expand All @@ -220,17 +220,24 @@ def test_unique_key_constraint(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
1, 1, 1, 1, 1, 1)
1, 1, 1, 1, 1, 1, 20200414, 0)
''')

# succeed to insert different dummy data under a different issue
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
1, 1, 1, 1, 1, 1, 20200415, 1)
''')

def test_nullable_columns(self):
"""Missing values should be surfaced as null."""

# insert dummy data
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
123, 0.123, NULL, NULL, 456, NULL)
123, 0.123, NULL, NULL, 456, NULL, 20200414, 0)
''')
self.cnx.commit()

Expand Down Expand Up @@ -268,15 +275,15 @@ def test_temporal_partitioning(self):
self.cur.execute('''
insert into covidcast values
(0, 'src', 'sig', 'hour', 'state', 2020041714, 'vi',
123, 10, 11, 12, 456, 13),
123, 10, 11, 12, 456, 13, 2020041714, 0),
(0, 'src', 'sig', 'day', 'state', 20200417, 'vi',
123, 20, 21, 22, 456, 23),
123, 20, 21, 22, 456, 23, 20200417, 00),
(0, 'src', 'sig', 'week', 'state', 202016, 'vi',
123, 30, 31, 32, 456, 33),
123, 30, 31, 32, 456, 33, 202016, 0),
(0, 'src', 'sig', 'month', 'state', 202004, 'vi',
123, 40, 41, 42, 456, 43),
123, 40, 41, 42, 456, 43, 202004, 0),
(0, 'src', 'sig', 'year', 'state', 2020, 'vi',
123, 50, 51, 52, 456, 53)
123, 50, 51, 52, 456, 53, 2020, 0)
''')
self.cnx.commit()

Expand Down
8 changes: 4 additions & 4 deletions integrations/server/test_covidcast_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def test_round_trip(self):
# insert dummy data and accumulate expected results (in sort order)
template = '''
insert into covidcast values
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0)
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0)
'''
expected = []
for src in ('src1', 'src2'):
Expand All @@ -67,7 +67,7 @@ def test_round_trip(self):
})
for tv in (1, 2):
for gv, v in zip(('geo1', 'geo2'), (10, 20)):
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v))
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v, tv))
self.cnx.commit()

# make the request
Expand All @@ -88,7 +88,7 @@ def test_suppress_work_in_progress(self):
# insert dummy data and accumulate expected results (in sort order)
template = '''
insert into covidcast values
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0)
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0)
'''
expected = []
for src in ('src1', 'src2'):
Expand All @@ -112,7 +112,7 @@ def test_suppress_work_in_progress(self):
})
for tv in (1, 2):
for gv, v in zip(('geo1', 'geo2'), (10, 20)):
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v))
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v, tv))
self.cnx.commit()

# make the request
Expand Down
43 changes: 33 additions & 10 deletions src/acquisition/covidcast/csv_importer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
"""Collects and reads covidcast data from a set of local CSV files."""

# standard library
import math
from datetime import date
import glob
import math
import os
import re

# third party
import pandas

# first party
from delphi.utils.epiweek import delta_epiweeks

class CsvImporter:
"""Finds and parses covidcast CSV files."""
Expand Down Expand Up @@ -46,38 +49,52 @@ def __init__(self, geo_value, value, stderr, sample_size):

@staticmethod
def is_sane_day(value):
"""Return whether `value` is a sane (maybe not valid) YYYYMMDD date."""
"""Return whether `value` is a sane (maybe not valid) YYYYMMDD date.

Truthy return is is a datetime.date object representing `value`."""

year, month, day = value // 10000, (value % 10000) // 100, value % 100

nearby_year = CsvImporter.MIN_YEAR <= year <= CsvImporter.MAX_YEAR
valid_month = 1 <= month <= 12
sensible_day = 1 <= day <= 31

return nearby_year and valid_month and sensible_day
if not (nearby_year and valid_month and sensible_day):
return False
return date(year=year,month=month,day=day)

@staticmethod
def is_sane_week(value):
"""Return whether `value` is a sane (maybe not valid) YYYYWW epiweek."""
"""Return whether `value` is a sane (maybe not valid) YYYYWW epiweek.

Truthy return is `value`."""

year, week = value // 100, value % 100

nearby_year = CsvImporter.MIN_YEAR <= year <= CsvImporter.MAX_YEAR
sensible_week = 1 <= week <= 53

return nearby_year and sensible_week
if not nearby_year and sensible_week:
return False
return value

@staticmethod
def find_csv_files(scan_dir, glob=glob):
def find_csv_files(scan_dir, issue=(date.today(), -1), glob=glob):
"""Recursively search for and yield covidcast-format CSV files.

scan_dir: the directory to scan (recursively)

The return value is a tuple of (path, details), where, if the path was
valid, details is a tuple of (source, signal, time_type, geo_type,
time_value) (otherwise None).
time_value, issue, lag) (otherwise None).
"""

issue_day,issue_epiweek=issue
issue_day_value=int(issue_day.strftime("%Y%m%d"))
issue_epiweek_value=issue_epiweek # TODO
issue_value=-1
lag_value=-1

for path in glob.glob(os.path.join(scan_dir, '*', '*')):
if not path.lower().endswith('.csv'):
# safe to ignore this file
Expand All @@ -98,18 +115,24 @@ def find_csv_files(scan_dir, glob=glob):
time_type = 'day'
time_value = int(daily_match.group(2))
match = daily_match
if not CsvImporter.is_sane_day(time_value):
time_value_day = CsvImporter.is_sane_day(time_value)
if not time_value_day:
print(' invalid filename day', time_value)
yield (path, None)
continue
issue_value=issue_day_value
lag_value=(issue_day-time_value_day).days
else:
time_type = 'week'
time_value = int(weekly_match.group(2))
match = weekly_match
if not CsvImporter.is_sane_week(time_value):
time_value_week=CsvImporter.is_sane_week(time_value)
if not time_value_week:
print(' invalid filename week', time_value)
yield (path, None)
continue
issue_value=issue_week_value
lag_value=delta_epiweeks(time_value_week, issue_week)

# # extract and validate geographic resolution
geo_type = match.group(3).lower()
Expand All @@ -126,7 +149,7 @@ def find_csv_files(scan_dir, glob=glob):
yield (path, None)
continue

yield (path, (source, signal, time_type, geo_type, time_value))
yield (path, (source, signal, time_type, geo_type, time_value, issue_value, lag_value))

@staticmethod
def is_header_valid(columns):
Expand Down
7 changes: 5 additions & 2 deletions src/acquisition/covidcast/csv_to_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def archive_as_successful(path_src, filename, source):
compress = True
file_archiver_impl.archive_file(path_src, path_dst, filename, compress)


# collect files
results = list(csv_importer_impl.find_csv_files(receiving_dir))
print('found %d files' % len(results))
Expand All @@ -71,7 +72,7 @@ def archive_as_successful(path_src, filename, source):
archive_as_failed(path_src, filename, 'unknown')
continue

(source, signal, time_type, geo_type, time_value) = details
(source, signal, time_type, geo_type, time_value, issue, lag) = details

# iterate over rows and upload to the database
all_rows_valid = True
Expand All @@ -90,7 +91,9 @@ def archive_as_successful(path_src, filename, source):
row_values.geo_value,
row_values.value,
row_values.stderr,
row_values.sample_size)
row_values.sample_size,
issue,
lag)
except Exception as e:
all_rows_valid = False
print('exception while inserting row:', e, row_values)
Expand Down
8 changes: 6 additions & 2 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def insert_or_update(
geo_value,
value,
stderr,
sample_size):
sample_size,
issue,
lag):
"""
Insert a new row, or update an existing row, in the `covidcast` table.

Expand All @@ -64,7 +66,7 @@ def insert_or_update(

sql = '''
INSERT INTO `covidcast` VALUES
(0, %s, %s, %s, %s, %s, %s, UNIX_TIMESTAMP(NOW()), %s, %s, %s, 0, NULL)
(0, %s, %s, %s, %s, %s, %s, UNIX_TIMESTAMP(NOW()), %s, %s, %s, 0, NULL, %s, %s)
ON DUPLICATE KEY UPDATE
`timestamp1` = VALUES(`timestamp1`),
`value` = VALUES(`value`),
Expand All @@ -82,6 +84,8 @@ def insert_or_update(
value,
stderr,
sample_size,
issue,
lag
)

self._cursor.execute(sql, args)
Expand Down
Loading