Skip to content

Commit 45d34ca

Browse files
authored
Merge pull request #150 from melange396/csv_acquisition
Improvements to acquisition / CSV file loading.
2 parents f4d55dd + 68af3b6 commit 45d34ca

File tree

6 files changed

+158
-49
lines changed

6 files changed

+158
-49
lines changed

integrations/acquisition/covidcast/test_csv_uploading.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ def test_uploading(self):
7575
f.write('nd,20,0.02,200\n')
7676
f.write('wa,30,0.03,300\n')
7777

78+
# invalid
79+
with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_get_truncated.csv', 'w') as f:
80+
f.write('geo_id,val,se,sample_size\n')
81+
f.write('pa,100,5.4,624\n')
82+
7883
# invalid
7984
with open(source_receiving_dir + '/20200420_state_test.csv', 'w') as f:
8085
f.write('this,header,is,wrong\n')
@@ -83,11 +88,6 @@ def test_uploading(self):
8388
with open(source_receiving_dir + '/hello.csv', 'w') as f:
8489
f.write('file name is wrong\n')
8590

86-
# invalid
87-
with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_get_truncated.csv', 'w') as f:
88-
f.write('geo_id,val,se,sample_size\n')
89-
f.write('pa,100,5.4,624\n')
90-
9191
# upload CSVs
9292
args = MagicMock(data_dir=data_dir)
9393
main(args)

src/acquisition/covidcast/csv_importer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
9292
issue_value=-1
9393
lag_value=-1
9494

95-
for path in glob.glob(os.path.join(scan_dir, '*', '*')):
95+
for path in sorted(glob.glob(os.path.join(scan_dir, '*', '*'))):
96+
9697
if not path.lower().endswith('.csv'):
9798
# safe to ignore this file
9899
continue

src/acquisition/covidcast/csv_to_database.py

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
# first party
88
from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter
9-
from delphi.epidata.acquisition.covidcast.database import Database
9+
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
1010
from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver
1111

1212

@@ -45,7 +45,7 @@ def scan_upload_archive(
4545

4646
# helper to archive a failed file without compression
4747
def archive_as_failed(path_src, filename, source):
48-
print('archiving as failed')
48+
print('archiving as failed - '+source)
4949
path_dst = os.path.join(archive_failed_dir, source)
5050
compress = False
5151
file_archiver_impl.archive_file(path_src, path_dst, filename, compress)
@@ -74,29 +74,22 @@ def archive_as_successful(path_src, filename, source):
7474

7575
(source, signal, time_type, geo_type, time_value, issue, lag) = details
7676

77-
# iterate over rows and upload to the database
78-
all_rows_valid = True
79-
for row_values in csv_importer_impl.load_csv(path, geo_type):
80-
if not row_values:
81-
all_rows_valid = False
82-
continue
83-
84-
try:
85-
database.insert_or_update(
86-
source,
87-
signal,
88-
time_type,
89-
geo_type,
90-
time_value,
91-
row_values.geo_value,
92-
row_values.value,
93-
row_values.stderr,
94-
row_values.sample_size,
95-
issue,
96-
lag)
97-
except Exception as e:
98-
all_rows_valid = False
99-
print('exception while inserting row:', e, row_values)
77+
csv_rows = csv_importer_impl.load_csv(path, geo_type)
78+
79+
all_rows_valid = False
80+
try:
81+
cc_rows = CovidcastRow.fromCsvRows(csv_rows, source, signal, time_type, geo_type, time_value, issue, lag)
82+
rows_list = list(cc_rows)
83+
if not rows_list:
84+
raise ValueError("No data")
85+
result = database.insert_or_update_bulk(rows_list)
86+
if result is None or result: # else would indicate zero rows inserted
87+
database.commit()
88+
all_rows_valid = True
89+
except Exception as e:
90+
all_rows_valid = False
91+
print('exception while inserting rows:', e)
92+
database.rollback()
10093

10194
# archive the current file based on validation results
10295
if all_rows_valid:

src/acquisition/covidcast/database.py

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,43 @@
1010
# first party
1111
import delphi.operations.secrets as secrets
1212

13+
from math import ceil
14+
15+
16+
class CovidcastRow():
17+
"""A container for all the values of a single covidcast row."""
18+
19+
@staticmethod
20+
def fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag):
21+
return CovidcastRow(source, signal, time_type, geo_type, time_value,
22+
row_value.geo_value,
23+
row_value.value,
24+
row_value.stderr,
25+
row_value.sample_size,
26+
issue, lag)
27+
28+
@staticmethod
29+
def fromCsvRows(row_values, source, signal, time_type, geo_type, time_value, issue, lag):
30+
# NOTE: returns a generator, as row_values is expected to be a generator
31+
return (CovidcastRow.fromCsvRowValue(row_value, source, signal, time_type, geo_type, time_value, issue, lag)
32+
for row_value in row_values)
33+
34+
def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, value, stderr, sample_size, issue, lag):
35+
self.id = None
36+
self.source = source
37+
self.signal = signal
38+
self.time_type = time_type
39+
self.geo_type = geo_type
40+
self.time_value = time_value
41+
self.geo_value = geo_value # from CSV row
42+
self.value = value # ...
43+
self.stderr = stderr # ...
44+
self.sample_size = sample_size # from CSV row
45+
self.timestamp2 = 0
46+
self.direction = None
47+
self.issue = issue
48+
self.lag = lag
49+
1350

1451
class Database:
1552
"""A collection of covidcast database operations."""
@@ -27,6 +64,12 @@ def connect(self, connector_impl=mysql.connector):
2764
database=Database.DATABASE_NAME)
2865
self._cursor = self._connection.cursor()
2966

67+
def commit(self):
68+
self._connection.commit()
69+
70+
def rollback(self):
71+
self._connection.rollback()
72+
3073
def disconnect(self, commit):
3174
"""Close the database connection.
3275
@@ -46,6 +89,69 @@ def count_all_rows(self):
4689
for (num,) in self._cursor:
4790
return num
4891

92+
def insert_or_update_bulk(self, cc_rows):
93+
return self.insert_or_update_batch(cc_rows)
94+
95+
def insert_or_update_batch(self, cc_rows, batch_size=0, commit_partial=False):
96+
"""
97+
Insert new rows (or update existing) in the `covidcast` table.
98+
99+
This has the intentional side effect of updating the primary timestamp.
100+
"""
101+
sql = '''
102+
INSERT INTO `covidcast`
103+
(`id`, `source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`,
104+
`timestamp1`, `value`, `stderr`, `sample_size`,
105+
`timestamp2`, `direction`,
106+
`issue`, `lag`)
107+
VALUES
108+
(0, %s, %s, %s, %s, %s, %s,
109+
UNIX_TIMESTAMP(NOW()), %s, %s, %s,
110+
0, NULL,
111+
%s, %s)
112+
ON DUPLICATE KEY UPDATE
113+
`timestamp1` = VALUES(`timestamp1`),
114+
`value` = VALUES(`value`),
115+
`stderr` = VALUES(`stderr`),
116+
`sample_size` = VALUES(`sample_size`)
117+
'''
118+
# TODO: ^ do we want to reset `timestamp2` and `direction` in the duplicate key case?
119+
120+
# TODO: consider handling cc_rows as a generator instead of a list
121+
num_rows = len(cc_rows)
122+
total = 0
123+
if not batch_size:
124+
batch_size = num_rows
125+
num_batches = ceil(num_rows/batch_size)
126+
for batch_num in range(num_batches):
127+
start = batch_num * batch_size
128+
end = min(num_rows, start + batch_size)
129+
length = end - start
130+
131+
args = [(
132+
row.source,
133+
row.signal,
134+
row.time_type,
135+
row.geo_type,
136+
row.time_value,
137+
row.geo_value,
138+
row.value,
139+
row.stderr,
140+
row.sample_size,
141+
row.issue,
142+
row.lag
143+
) for row in cc_rows[start:end]]
144+
145+
result = self._cursor.executemany(sql, args)
146+
if result is None:
147+
# the SQL connector does not support returning number of rows affected
148+
total = None
149+
else:
150+
total += result
151+
if commit_partial:
152+
self._connection.commit()
153+
return total
154+
49155
def insert_or_update(
50156
self,
51157
source,
@@ -429,6 +535,7 @@ def update_timeseries_timestamp2(
429535
args = (source, signal, time_type, geo_type, geo_value)
430536
self._cursor.execute(sql, args)
431537

538+
432539
def get_covidcast_meta(self):
433540
"""Compute and return metadata on all non-WIP COVIDcast signals."""
434541

@@ -491,7 +598,7 @@ def get_covidcast_meta(self):
491598
t.`signal` ASC,
492599
t.`time_type` ASC,
493600
t.`geo_type` ASC
494-
'''
601+
'''
495602
self._cursor.execute(sql)
496603
return list(dict(zip(self._cursor.column_names,x)) for x in self._cursor)
497604

tests/acquisition/covidcast/test_csv_importer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,19 @@ def test_find_csv_files(self):
5757
mock_glob = MagicMock()
5858
mock_glob.glob.return_value = glob_paths
5959

60-
found = list(CsvImporter.find_csv_files(path_prefix, glob=mock_glob))
60+
found = set(CsvImporter.find_csv_files(path_prefix, glob=mock_glob))
6161

6262
expected_issue_day=int(date.today().strftime("%Y%m%d"))
6363
expected_issue_week=int(str(epi.Week.fromdate(date.today())))
6464
time_value_day = 20200408
65-
expected = [
65+
expected = set([
6666
(glob_paths[0], ('fb_survey', 'cli', 'week', 'county', 202015, expected_issue_week, delta_epiweeks(202015, expected_issue_week))),
6767
(glob_paths[1], ('ght', 'rawsearch', 'day', 'state', time_value_day, expected_issue_day, (date.today() - date(year=time_value_day // 10000, month=(time_value_day // 100) % 100, day=time_value_day % 100)).days)),
6868
(glob_paths[2], None),
6969
(glob_paths[3], None),
7070
(glob_paths[4], None),
7171
(glob_paths[5], None),
72-
]
72+
])
7373
self.assertEqual(found, expected)
7474

7575
def test_is_header_valid_allows_extra_columns(self):

tests/acquisition/covidcast/test_csv_to_database.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ def load_csv_impl(path, *args):
3939
yield make_row('b1')
4040
yield None
4141
yield make_row('b3')
42+
elif path == 'path/d.csv':
43+
yield make_row('d1')
4244
else:
4345
# fail the test for any other path
4446
raise Exception('unexpected path')
@@ -47,10 +49,14 @@ def load_csv_impl(path, *args):
4749
mock_database = MagicMock()
4850
mock_csv_importer = MagicMock()
4951
mock_csv_importer.find_csv_files.return_value = [
52+
# a good file
5053
('path/a.csv', ('src_a', 'sig_a', 'day', 'hrr', 20200419, 20200420, 1)),
54+
# a file with a data error
5155
('path/b.csv', ('src_b', 'sig_b', 'week', 'msa', 202016, 202017, 1)),
5256
# emulate a file that's named incorrectly
5357
('path/c.csv', None),
58+
# another good file
59+
('path/d.csv', ('src_d', 'sig_d', 'week', 'msa', 202016, 202017, 1)),
5460
]
5561
mock_csv_importer.load_csv = load_csv_impl
5662
mock_file_archiver = MagicMock()
@@ -61,27 +67,29 @@ def load_csv_impl(path, *args):
6167
csv_importer_impl=mock_csv_importer,
6268
file_archiver_impl=mock_file_archiver)
6369

64-
# verify that five rows were added to the database
65-
self.assertEqual(mock_database.insert_or_update.call_count, 5)
66-
call_args_list = mock_database.insert_or_update.call_args_list
67-
actual_args = [args for (args, kwargs) in call_args_list]
70+
# verify that appropriate rows were added to the database
71+
self.assertEqual(mock_database.insert_or_update_bulk.call_count, 2)
72+
call_args_list = mock_database.insert_or_update_bulk.call_args_list
73+
actual_args = [[(a.source, a.signal, a.time_type, a.geo_type, a.time_value,
74+
a.geo_value, a.value, a.stderr, a.sample_size, a.issue, a.lag)
75+
for a in call.args[0]] for call in call_args_list]
6876
expected_args = [
69-
('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a1', 'a1', 'a1', 'a1', 20200420, 1),
70-
('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a2', 'a2', 'a2', 'a2', 20200420, 1),
71-
('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a3', 'a3', 'a3', 'a3', 20200420, 1),
72-
('src_b', 'sig_b', 'week', 'msa', 202016, 'b1', 'b1', 'b1', 'b1', 202017, 1),
73-
('src_b', 'sig_b', 'week', 'msa', 202016, 'b3', 'b3', 'b3', 'b3', 202017, 1),
77+
[('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a1', 'a1', 'a1', 'a1', 20200420, 1),
78+
('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a2', 'a2', 'a2', 'a2', 20200420, 1),
79+
('src_a', 'sig_a', 'day', 'hrr', 20200419, 'a3', 'a3', 'a3', 'a3', 20200420, 1)],
80+
[('src_d', 'sig_d', 'week', 'msa', 202016, 'd1', 'd1', 'd1', 'd1', 202017, 1)]
7481
]
7582
self.assertEqual(actual_args, expected_args)
7683

77-
# verify that one file was successful (a) and two failed (b, c)
78-
self.assertEqual(mock_file_archiver.archive_file.call_count, 3)
84+
# verify that two files were successful (a, d) and two failed (b, c)
85+
self.assertEqual(mock_file_archiver.archive_file.call_count, 4)
7986
call_args_list = mock_file_archiver.archive_file.call_args_list
8087
actual_args = [args for (args, kwargs) in call_args_list]
8188
expected_args = [
8289
('path', 'data_dir/archive/successful/src_a', 'a.csv', True),
8390
('path', 'data_dir/archive/failed/src_b', 'b.csv', False),
8491
('path', 'data_dir/archive/failed/unknown', 'c.csv', False),
92+
('path', 'data_dir/archive/successful/src_d', 'd.csv', True),
8593
]
8694
self.assertEqual(actual_args, expected_args)
8795

@@ -133,7 +141,7 @@ def test_database_exception_is_handled(self):
133141

134142
data_dir = 'data_dir'
135143
mock_database = MagicMock()
136-
mock_database.insert_or_update.side_effect = Exception('testing')
144+
mock_database.insert_or_update_bulk.side_effect = Exception('testing')
137145
mock_csv_importer = MagicMock()
138146
mock_csv_importer.find_csv_files.return_value = [
139147
('path/file.csv', ('src', 'sig', 'day', 'hrr', 20200423, 20200424, 1)),
@@ -149,8 +157,8 @@ def test_database_exception_is_handled(self):
149157
csv_importer_impl=mock_csv_importer,
150158
file_archiver_impl=mock_file_archiver)
151159

152-
# verify that a row insertion was attempted
153-
self.assertTrue(mock_database.insert_or_update.called)
160+
# verify that insertions were attempted
161+
self.assertTrue(mock_database.insert_or_update_bulk.called)
154162

155163
# verify that the file was archived as having failed
156164
self.assertTrue(mock_file_archiver.archive_file.called)

0 commit comments

Comments
 (0)