Skip to content

Commit 50e1483

Browse files
authored
Merge pull request #415 from benjaminysmith/ingestion_logging
Add logging to CSV ingestion
2 parents 9554cc6 + b461884 commit 50e1483

File tree

3 files changed

+43
-5
lines changed

3 files changed

+43
-5
lines changed

integrations/acquisition/covidcast/test_csv_uploading.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ def test_uploading(self):
6060
# make some fake data files
6161
data_dir = 'covid/data'
6262
source_receiving_dir = data_dir + '/receiving/src-name'
63+
log_file_directory = "/var/log/"
6364
os.makedirs(source_receiving_dir, exist_ok=True)
65+
os.makedirs(log_file_directory, exist_ok=True)
6466

6567
# valid
6668
with open(source_receiving_dir + '/20200419_state_test.csv', 'w') as f:
@@ -96,7 +98,13 @@ def test_uploading(self):
9698

9799
# upload CSVs
98100
# TODO: use an actual argparse object for the args instead of a MagicMock
99-
args = MagicMock(data_dir=data_dir, is_wip_override=False, not_wip_override=False, specific_issue_date=False)
101+
args = MagicMock(
102+
log_file=log_file_directory +
103+
"output.log",
104+
data_dir=data_dir,
105+
is_wip_override=False,
106+
not_wip_override=False,
107+
specific_issue_date=False)
100108
main(args)
101109

102110
# request CSV data from the API

src/acquisition/covidcast/csv_to_database.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
# standard library
44
import argparse
55
import os
6+
import time
67

78
# first party
89
from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter
910
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
1011
from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver
12+
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
1113

1214

1315
def get_argument_parser():
@@ -30,6 +32,9 @@ def get_argument_parser():
3032
'--not_wip_override',
3133
action='store_true',
3234
help='overrides all signals to mark them as *not* WIP. NOTE: specify neither or only one of --is_wip_override and --not_wip_override.')
35+
parser.add_argument(
36+
'--log_file',
37+
help="filename for log output (defaults to stdout)")
3338
return parser
3439

3540
def collect_files(data_dir, specific_issue_date, csv_importer_impl=CsvImporter):
@@ -77,6 +82,7 @@ def upload_archive(
7782
path_details,
7883
database,
7984
handlers,
85+
logger,
8086
is_wip_override=None,
8187
csv_importer_impl=CsvImporter):
8288
"""Upload CSVs to the database and archive them using the specified handlers.
@@ -123,6 +129,15 @@ def upload_archive(
123129
try:
124130
result = database.insert_or_update_bulk(rows_list)
125131
print(f"insert_or_update_bulk {filename} returned {result}")
132+
logger.info(
133+
"Inserted database rows",
134+
row_count = result,
135+
source = source,
136+
signal = signal,
137+
geo_type = geo_type,
138+
time_value = time_value,
139+
issue = issue,
140+
lag = lag)
126141
if result is None or result: # else would indicate zero rows inserted
127142
database.commit()
128143
except Exception as e:
@@ -143,6 +158,9 @@ def main(
143158
upload_archive_impl=upload_archive):
144159
"""Find, parse, and upload covidcast signals."""
145160

161+
logger = get_structured_logger("csv_ingestion", filename=args.log_file)
162+
start_time = time.time()
163+
146164
if args.is_wip_override and args.not_wip_override:
147165
print('conflicting overrides for forcing WIP option! exiting...')
148166
return
@@ -157,7 +175,9 @@ def main(
157175
if not path_details:
158176
print('nothing to do; exiting...')
159177
return
160-
178+
179+
logger.info("Ingesting CSVs", csv_count = len(path_details))
180+
161181
database = database_impl()
162182
database.connect()
163183
num_starting_rows = database.count_all_rows()
@@ -167,16 +187,22 @@ def main(
167187
path_details,
168188
database,
169189
make_handlers(args.data_dir, args.specific_issue_date),
190+
logger,
170191
is_wip_override=wip_override)
171192
finally:
172193
# no catch block so that an exception above will cause the program to fail
173194
# after the following cleanup
174195
try:
175196
num_inserted_rows = database.count_all_rows() - num_starting_rows
197+
logger.info("Finished inserting database rows", row_count = num_inserted_rows)
176198
print('inserted/updated %d rows' % num_inserted_rows)
177199
finally:
178200
# unconditionally commit database changes since CSVs have been archived
179201
database.disconnect(True)
202+
203+
logger.info(
204+
"Ingested CSVs into database",
205+
total_runtime_in_seconds=round(time.time() - start_time, 2))
180206

181207
if __name__ == '__main__':
182208
main(get_argument_parser().parse_args())

tests/acquisition/covidcast/test_csv_to_database.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,14 @@ def load_csv_impl(path, *args):
7676
mock_csv_importer = MagicMock()
7777
mock_csv_importer.load_csv = load_csv_impl
7878
mock_file_archiver = MagicMock()
79+
mock_logger = MagicMock()
7980

8081
upload_archive(
8182
self._path_details(),
8283
mock_database,
8384
make_handlers(data_dir, False,
8485
file_archiver_impl=mock_file_archiver),
86+
mock_logger,
8587
csv_importer_impl=mock_csv_importer)
8688

8789
# verify that appropriate rows were added to the database
@@ -114,7 +116,7 @@ def test_main_successful(self):
114116
"""Run the main program successfully, then commit changes."""
115117

116118
# TODO: use an actual argparse object for the args instead of a MagicMock
117-
args = MagicMock(data_dir='data', is_wip_override=False, not_wip_override=False, specific_issue_date=False)
119+
args = MagicMock(log_file=None, data_dir='data', is_wip_override=False, not_wip_override=False, specific_issue_date=False)
118120
mock_database = MagicMock()
119121
mock_database.count_all_rows.return_value = 0
120122
fake_database_impl = lambda: mock_database
@@ -142,7 +144,7 @@ def test_main_unsuccessful(self):
142144
"""Run the main program with failure, then commit changes."""
143145

144146
# TODO: use an actual argparse object for the args instead of a MagicMock
145-
args = MagicMock(data_dir='data', is_wip_override=False, not_wip_override=False, specific_issue_date=False)
147+
args = MagicMock(log_file=None, data_dir='data', is_wip_override=False, not_wip_override=False, specific_issue_date=False)
146148
mock_database = MagicMock()
147149
mock_database.count_all_rows.return_value = 0
148150
fake_database_impl = lambda: mock_database
@@ -168,7 +170,7 @@ def test_main_early_exit(self):
168170
"""Run the main program with an empty receiving directory."""
169171

170172
# TODO: use an actual argparse object for the args instead of a MagicMock
171-
args = MagicMock(data_dir='data', is_wip_override=False, not_wip_override=False, specific_issue_date=False)
173+
args = MagicMock(log_file=None, data_dir='data', is_wip_override=False, not_wip_override=False, specific_issue_date=False)
172174
mock_database = MagicMock()
173175
mock_database.count_all_rows.return_value = 0
174176
fake_database_impl = lambda: mock_database
@@ -204,11 +206,13 @@ def test_database_exception_is_handled(self):
204206
MagicMock(geo_value='geo', value=1, stderr=1, sample_size=1),
205207
]
206208
mock_file_archiver = MagicMock()
209+
mock_logger = MagicMock()
207210

208211
upload_archive(
209212
collect_files(data_dir, False, csv_importer_impl=mock_csv_importer),
210213
mock_database,
211214
make_handlers(data_dir, False, file_archiver_impl=mock_file_archiver),
215+
mock_logger,
212216
csv_importer_impl=mock_csv_importer,
213217
)
214218

0 commit comments

Comments
 (0)