Skip to content

Commit ab20e46

Browse files
authored
Merge pull request #122 from krivard/feature/extend-covidcast-to-store-issue-date
Add columns to covidcast for issue and lag.
2 parents 8a52128 + 9753bbf commit ab20e46

File tree

9 files changed

+100
-54
lines changed

9 files changed

+100
-54
lines changed

integrations/acquisition/covidcast/test_covidcast_meta_caching.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ def test_caching(self):
6565
self.cur.execute('''
6666
insert into covidcast values
6767
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
68-
123, 1, 2, 3, 456, 1)
68+
123, 1, 2, 3, 456, 1, 20200422, 0)
6969
''')
7070
self.cur.execute('''
7171
insert into covidcast values
7272
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
73-
456, 4, 5, 6, 789, -1)
73+
456, 4, 5, 6, 789, -1, 20200422, 0)
7474
''')
7575

7676
self.cnx.commit()

integrations/acquisition/covidcast/test_direction_updating.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,23 @@ def test_uploading(self):
6161
self.cur.execute('''
6262
insert into covidcast values
6363
(0, 'src', 'sig', 'day', 'state', 20200228, 'ca',
64-
123, 2, 0, 0, 0, NULL),
64+
123, 2, 0, 0, 0, NULL, 20200228, 0),
6565
(0, 'src', 'sig', 'day', 'state', 20200229, 'ca',
66-
123, 6, 0, 0, 0, NULL),
66+
123, 6, 0, 0, 0, NULL, 20200229, 0),
6767
(0, 'src', 'sig', 'day', 'state', 20200301, 'ca',
68-
123, 5, 0, 0, 0, NULL),
68+
123, 5, 0, 0, 0, NULL, 20200301, 0),
6969
(0, 'src', 'sig', 'day', 'state', 20200511, 'fl',
70-
123, 1, 0, 0, 0, NULL),
70+
123, 1, 0, 0, 0, NULL, 20200511, 0),
7171
(0, 'src', 'sig', 'day', 'state', 20200512, 'fl',
72-
123, 2, 0, 0, 0, NULL),
72+
123, 2, 0, 0, 0, NULL, 20200512, 0),
7373
(0, 'src', 'sig', 'day', 'state', 20200517, 'fl',
74-
123, 2, 0, 0, 0, NULL),
74+
123, 2, 0, 0, 0, NULL, 20200517, 0),
7575
(0, 'src', 'sig', 'day', 'state', 20200615, 'tx',
76-
123, 9, 0, 0, 456, NULL),
76+
123, 9, 0, 0, 456, NULL, 20200615, 0),
7777
(0, 'src', 'sig', 'day', 'state', 20200616, 'tx',
78-
123, 5, 0, 0, 456, NULL),
78+
123, 5, 0, 0, 456, NULL, 20200616, 0),
7979
(0, 'src', 'sig', 'day', 'state', 20200617, 'tx',
80-
123, 1, 0, 0, 456, 1)
80+
123, 1, 0, 0, 456, 1, 20200617, 0)
8181
''')
8282
self.cnx.commit()
8383

integrations/client/test_delphi_epidata.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def test_covidcast(self):
4949
self.cur.execute('''
5050
insert into covidcast values
5151
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
52-
123, 1.5, 2.5, 3.5, 456, 4)
52+
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0)
5353
''')
5454
self.cnx.commit()
5555

@@ -78,7 +78,7 @@ def test_covidcast_meta(self):
7878
self.cur.execute('''
7979
insert into covidcast values
8080
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
81-
123, 1.5, 2.5, 3.5, 456, 4)
81+
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0)
8282
''')
8383
self.cnx.commit()
8484

integrations/server/test_covidcast.py

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def test_round_trip(self):
4545
self.cur.execute('''
4646
insert into covidcast values
4747
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
48-
123, 1.5, 2.5, 3.5, 456, 4)
48+
123, 1.5, 2.5, 3.5, 456, 4, 20200414, 0)
4949
''')
5050
self.cnx.commit()
5151

@@ -83,17 +83,17 @@ def test_location_wildcard(self):
8383
self.cur.execute('''
8484
insert into covidcast values
8585
(0, 'src', 'sig', 'day', 'county', 20200414, '11111',
86-
123, 10, 11, 12, 456, 13),
86+
123, 10, 11, 12, 456, 13, 20200414, 0),
8787
(0, 'src', 'sig', 'day', 'county', 20200414, '22222',
88-
123, 20, 21, 22, 456, 23),
88+
123, 20, 21, 22, 456, 23, 20200414, 0),
8989
(0, 'src', 'sig', 'day', 'county', 20200414, '33333',
90-
123, 30, 31, 32, 456, 33),
90+
123, 30, 31, 32, 456, 33, 20200414, 0),
9191
(0, 'src', 'sig', 'day', 'msa', 20200414, '11111',
92-
123, 40, 41, 42, 456, 43),
92+
123, 40, 41, 42, 456, 43, 20200414, 0),
9393
(0, 'src', 'sig', 'day', 'msa', 20200414, '22222',
94-
123, 50, 51, 52, 456, 53),
94+
123, 50, 51, 52, 456, 53, 20200414, 0),
9595
(0, 'src', 'sig', 'day', 'msa', 20200414, '33333',
96-
123, 60, 61, 62, 456, 634)
96+
123, 60, 61, 62, 456, 634, 20200414, 0)
9797
''')
9898
self.cnx.commit()
9999

@@ -147,17 +147,17 @@ def test_location_timeline(self):
147147
self.cur.execute('''
148148
insert into covidcast values
149149
(0, 'src', 'sig', 'day', 'county', 20200411, '01234',
150-
123, 10, 11, 12, 456, 13),
150+
123, 10, 11, 12, 456, 13, 20200413, 2),
151151
(0, 'src', 'sig', 'day', 'county', 20200412, '01234',
152-
123, 20, 21, 22, 456, 23),
152+
123, 20, 21, 22, 456, 23, 20200413, 1),
153153
(0, 'src', 'sig', 'day', 'county', 20200413, '01234',
154-
123, 30, 31, 32, 456, 33),
154+
123, 30, 31, 32, 456, 33, 20200413, 0),
155155
(0, 'src', 'sig', 'day', 'county', 20200411, '11111',
156-
123, 40, 41, 42, 456, 43),
156+
123, 40, 41, 42, 456, 43, 20200413, 2),
157157
(0, 'src', 'sig', 'day', 'county', 20200412, '22222',
158-
123, 50, 51, 52, 456, 53),
158+
123, 50, 51, 52, 456, 53, 20200413, 1),
159159
(0, 'src', 'sig', 'day', 'county', 20200413, '33333',
160-
123, 60, 61, 62, 456, 63)
160+
123, 60, 61, 62, 456, 63, 20200413, 0)
161161
''')
162162
self.cnx.commit()
163163

@@ -211,7 +211,7 @@ def test_unique_key_constraint(self):
211211
self.cur.execute('''
212212
insert into covidcast values
213213
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
214-
0, 0, 0, 0, 0, 0)
214+
0, 0, 0, 0, 0, 0, 20200414, 0)
215215
''')
216216
self.cnx.commit()
217217

@@ -220,17 +220,24 @@ def test_unique_key_constraint(self):
220220
self.cur.execute('''
221221
insert into covidcast values
222222
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
223-
1, 1, 1, 1, 1, 1)
223+
1, 1, 1, 1, 1, 1, 20200414, 0)
224224
''')
225225

226+
# succeed to insert different dummy data under a different issue
227+
self.cur.execute('''
228+
insert into covidcast values
229+
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
230+
1, 1, 1, 1, 1, 1, 20200415, 1)
231+
''')
232+
226233
def test_nullable_columns(self):
227234
"""Missing values should be surfaced as null."""
228235

229236
# insert dummy data
230237
self.cur.execute('''
231238
insert into covidcast values
232239
(0, 'src', 'sig', 'day', 'county', 20200414, '01234',
233-
123, 0.123, NULL, NULL, 456, NULL)
240+
123, 0.123, NULL, NULL, 456, NULL, 20200414, 0)
234241
''')
235242
self.cnx.commit()
236243

@@ -268,15 +275,15 @@ def test_temporal_partitioning(self):
268275
self.cur.execute('''
269276
insert into covidcast values
270277
(0, 'src', 'sig', 'hour', 'state', 2020041714, 'vi',
271-
123, 10, 11, 12, 456, 13),
278+
123, 10, 11, 12, 456, 13, 2020041714, 0),
272279
(0, 'src', 'sig', 'day', 'state', 20200417, 'vi',
273-
123, 20, 21, 22, 456, 23),
280+
123, 20, 21, 22, 456, 23, 20200417, 00),
274281
(0, 'src', 'sig', 'week', 'state', 202016, 'vi',
275-
123, 30, 31, 32, 456, 33),
282+
123, 30, 31, 32, 456, 33, 202016, 0),
276283
(0, 'src', 'sig', 'month', 'state', 202004, 'vi',
277-
123, 40, 41, 42, 456, 43),
284+
123, 40, 41, 42, 456, 43, 202004, 0),
278285
(0, 'src', 'sig', 'year', 'state', 2020, 'vi',
279-
123, 50, 51, 52, 456, 53)
286+
123, 50, 51, 52, 456, 53, 2020, 0)
280287
''')
281288
self.cnx.commit()
282289

integrations/server/test_covidcast_meta.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def test_round_trip(self):
4444
# insert dummy data and accumulate expected results (in sort order)
4545
template = '''
4646
insert into covidcast values
47-
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0)
47+
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0)
4848
'''
4949
expected = []
5050
for src in ('src1', 'src2'):
@@ -67,7 +67,7 @@ def test_round_trip(self):
6767
})
6868
for tv in (1, 2):
6969
for gv, v in zip(('geo1', 'geo2'), (10, 20)):
70-
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v))
70+
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v, tv))
7171
self.cnx.commit()
7272

7373
# make the request
@@ -88,7 +88,7 @@ def test_suppress_work_in_progress(self):
8888
# insert dummy data and accumulate expected results (in sort order)
8989
template = '''
9090
insert into covidcast values
91-
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0)
91+
(0, "%s", "%s", "%s", "%s", %d, "%s", 123, %d, 0, 0, 456, 0, %d, 0)
9292
'''
9393
expected = []
9494
for src in ('src1', 'src2'):
@@ -112,7 +112,7 @@ def test_suppress_work_in_progress(self):
112112
})
113113
for tv in (1, 2):
114114
for gv, v in zip(('geo1', 'geo2'), (10, 20)):
115-
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v))
115+
self.cur.execute(template % (src, sig, tt, gt, tv, gv, v, tv))
116116
self.cnx.commit()
117117

118118
# make the request

src/acquisition/covidcast/csv_importer.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
"""Collects and reads covidcast data from a set of local CSV files."""
22

33
# standard library
4-
import math
4+
from datetime import date
55
import glob
6+
import math
67
import os
78
import re
89

910
# third party
1011
import pandas
1112

13+
# first party
14+
from delphi.utils.epiweek import delta_epiweeks
1215

1316
class CsvImporter:
1417
"""Finds and parses covidcast CSV files."""
@@ -46,38 +49,52 @@ def __init__(self, geo_value, value, stderr, sample_size):
4649

4750
@staticmethod
4851
def is_sane_day(value):
49-
"""Return whether `value` is a sane (maybe not valid) YYYYMMDD date."""
52+
"""Return whether `value` is a sane (maybe not valid) YYYYMMDD date.
53+
54+
Truthy return is is a datetime.date object representing `value`."""
5055

5156
year, month, day = value // 10000, (value % 10000) // 100, value % 100
5257

5358
nearby_year = CsvImporter.MIN_YEAR <= year <= CsvImporter.MAX_YEAR
5459
valid_month = 1 <= month <= 12
5560
sensible_day = 1 <= day <= 31
5661

57-
return nearby_year and valid_month and sensible_day
62+
if not (nearby_year and valid_month and sensible_day):
63+
return False
64+
return date(year=year,month=month,day=day)
5865

5966
@staticmethod
6067
def is_sane_week(value):
61-
"""Return whether `value` is a sane (maybe not valid) YYYYWW epiweek."""
68+
"""Return whether `value` is a sane (maybe not valid) YYYYWW epiweek.
69+
70+
Truthy return is `value`."""
6271

6372
year, week = value // 100, value % 100
6473

6574
nearby_year = CsvImporter.MIN_YEAR <= year <= CsvImporter.MAX_YEAR
6675
sensible_week = 1 <= week <= 53
6776

68-
return nearby_year and sensible_week
77+
if not nearby_year and sensible_week:
78+
return False
79+
return value
6980

7081
@staticmethod
71-
def find_csv_files(scan_dir, glob=glob):
82+
def find_csv_files(scan_dir, issue=(date.today(), -1), glob=glob):
7283
"""Recursively search for and yield covidcast-format CSV files.
7384
7485
scan_dir: the directory to scan (recursively)
7586
7687
The return value is a tuple of (path, details), where, if the path was
7788
valid, details is a tuple of (source, signal, time_type, geo_type,
78-
time_value) (otherwise None).
89+
time_value, issue, lag) (otherwise None).
7990
"""
8091

92+
issue_day,issue_epiweek=issue
93+
issue_day_value=int(issue_day.strftime("%Y%m%d"))
94+
issue_epiweek_value=issue_epiweek # TODO
95+
issue_value=-1
96+
lag_value=-1
97+
8198
for path in glob.glob(os.path.join(scan_dir, '*', '*')):
8299
if not path.lower().endswith('.csv'):
83100
# safe to ignore this file
@@ -98,18 +115,24 @@ def find_csv_files(scan_dir, glob=glob):
98115
time_type = 'day'
99116
time_value = int(daily_match.group(2))
100117
match = daily_match
101-
if not CsvImporter.is_sane_day(time_value):
118+
time_value_day = CsvImporter.is_sane_day(time_value)
119+
if not time_value_day:
102120
print(' invalid filename day', time_value)
103121
yield (path, None)
104122
continue
123+
issue_value=issue_day_value
124+
lag_value=(issue_day-time_value_day).days
105125
else:
106126
time_type = 'week'
107127
time_value = int(weekly_match.group(2))
108128
match = weekly_match
109-
if not CsvImporter.is_sane_week(time_value):
129+
time_value_week=CsvImporter.is_sane_week(time_value)
130+
if not time_value_week:
110131
print(' invalid filename week', time_value)
111132
yield (path, None)
112133
continue
134+
issue_value=issue_week_value
135+
lag_value=delta_epiweeks(time_value_week, issue_week)
113136

114137
# # extract and validate geographic resolution
115138
geo_type = match.group(3).lower()
@@ -126,7 +149,7 @@ def find_csv_files(scan_dir, glob=glob):
126149
yield (path, None)
127150
continue
128151

129-
yield (path, (source, signal, time_type, geo_type, time_value))
152+
yield (path, (source, signal, time_type, geo_type, time_value, issue_value, lag_value))
130153

131154
@staticmethod
132155
def is_header_valid(columns):

src/acquisition/covidcast/csv_to_database.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def archive_as_successful(path_src, filename, source):
5757
compress = True
5858
file_archiver_impl.archive_file(path_src, path_dst, filename, compress)
5959

60+
6061
# collect files
6162
results = list(csv_importer_impl.find_csv_files(receiving_dir))
6263
print('found %d files' % len(results))
@@ -71,7 +72,7 @@ def archive_as_successful(path_src, filename, source):
7172
archive_as_failed(path_src, filename, 'unknown')
7273
continue
7374

74-
(source, signal, time_type, geo_type, time_value) = details
75+
(source, signal, time_type, geo_type, time_value, issue, lag) = details
7576

7677
# iterate over rows and upload to the database
7778
all_rows_valid = True
@@ -90,7 +91,9 @@ def archive_as_successful(path_src, filename, source):
9091
row_values.geo_value,
9192
row_values.value,
9293
row_values.stderr,
93-
row_values.sample_size)
94+
row_values.sample_size,
95+
issue,
96+
lag)
9497
except Exception as e:
9598
all_rows_valid = False
9699
print('exception while inserting row:', e, row_values)

src/acquisition/covidcast/database.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ def insert_or_update(
5555
geo_value,
5656
value,
5757
stderr,
58-
sample_size):
58+
sample_size,
59+
issue,
60+
lag):
5961
"""
6062
Insert a new row, or update an existing row, in the `covidcast` table.
6163
@@ -64,7 +66,7 @@ def insert_or_update(
6466

6567
sql = '''
6668
INSERT INTO `covidcast` VALUES
67-
(0, %s, %s, %s, %s, %s, %s, UNIX_TIMESTAMP(NOW()), %s, %s, %s, 0, NULL)
69+
(0, %s, %s, %s, %s, %s, %s, UNIX_TIMESTAMP(NOW()), %s, %s, %s, 0, NULL, %s, %s)
6870
ON DUPLICATE KEY UPDATE
6971
`timestamp1` = VALUES(`timestamp1`),
7072
`value` = VALUES(`value`),
@@ -82,6 +84,8 @@ def insert_or_update(
8284
value,
8385
stderr,
8486
sample_size,
87+
issue,
88+
lag
8589
)
8690

8791
self._cursor.execute(sql, args)

0 commit comments

Comments
 (0)