diff --git a/integrations/acquisition/covidcast/test_csv_uploading.py b/integrations/acquisition/covidcast/test_csv_uploading.py index 397dc0a73..bb65e10d8 100644 --- a/integrations/acquisition/covidcast/test_csv_uploading.py +++ b/integrations/acquisition/covidcast/test_csv_uploading.py @@ -8,6 +8,8 @@ # third party import mysql.connector +import pandas as pd +import numpy as np # first party from delphi_utils import Nans @@ -52,6 +54,26 @@ def tearDown(self): self.cur.close() self.cnx.close() + @staticmethod + def apply_lag(expected_epidata): + expected_issue_day=date.today() + expected_issue=expected_issue_day.strftime("%Y%m%d") + for dct in expected_epidata: + dct['issue'] = int(expected_issue) + time_value_day = date(year=dct['time_value'] // 10000, + month=dct['time_value'] % 10000 // 100, + day= dct['time_value'] % 100) + expected_lag = (expected_issue_day - time_value_day).days + dct['lag'] = expected_lag + return expected_epidata + + def verify_timestamps_and_defaults(self): + self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast') + for value_updated_timestamp, direction_updated_timestamp, direction in self.cur: + self.assertGreater(value_updated_timestamp, 0) + self.assertEqual(direction_updated_timestamp, 0) + self.assertIsNone(direction) + def test_uploading(self): """Scan, parse, upload, archive, serve, and fetch a covidcast signal.""" @@ -64,299 +86,250 @@ def test_uploading(self): log_file_directory = "/var/log/" os.makedirs(source_receiving_dir, exist_ok=True) os.makedirs(log_file_directory, exist_ok=True) - - # valid - with open(source_receiving_dir + '/20200419_state_test.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'ca,1,0.1,10,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - f.write(f'tx,2,0.2,20,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - f.write(f'fl,3,0.3,30,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # valid, old style no missing cols should have intelligent defaults - with open(source_receiving_dir + '/20200419_state_test_no_missing.csv', 'w') as f: - f.write('geo_id,val,se,sample_size\n') - f.write('ca,1,0.1,10\n') - f.write('tx,NA,0.2,20\n') - f.write('wa,3,0.3,30\n') - - # invalid, missing with an inf value - with open(source_receiving_dir + '/20200419_state_test_missing1.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'fl,inf,0.3,30,{Nans.OTHER},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # invalid, missing with an incorrect missing code - with open(source_receiving_dir + '/20200419_state_test_missing2.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'tx,NA,0.2,20,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # invalid, no missing with an incorrect missing code - with open(source_receiving_dir + '/20200419_state_test_missing3.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'wa,3,0.3,30,{Nans.OTHER},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # valid wip - with open(source_receiving_dir + '/20200419_state_wip_prototype.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'me,10,0.01,100,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - f.write(f'nd,20,0.02,200,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - f.write(f'wa,30,0.03,300,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # invalid - with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_be_accepted.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'pa,100,5.4,624,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # invalid - with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_get_truncated_lorem_ipsum_dolor_sit_amet.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'pa,100,5.4,624,{Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}\n') - - # invalid - with open(source_receiving_dir + '/20200420_state_test.csv', 'w') as f: - f.write('this,header,is,wrong\n') - - # invalid - with open(source_receiving_dir + '/hello.csv', 'w') as f: - f.write('file name is wrong\n') - - # upload CSVs # TODO: use an actual argparse object for the args instead of a MagicMock args = MagicMock( - log_file=log_file_directory + - "output.log", - data_dir=data_dir, - is_wip_override=False, - not_wip_override=False, - specific_issue_date=False) - main(args) + log_file=log_file_directory + + "output.log", + data_dir=data_dir, + is_wip_override=False, + not_wip_override=False, + specific_issue_date=False) + uploader_column_rename = {"geo_id": "geo_value", "val": "value", "se": "stderr", "missing_val": "missing_value", "missing_se": "missing_stderr"} + + + with self.subTest("Valid CSV with correct missing columns"): + values = pd.DataFrame({ + "geo_id": ["ca", "fl", "tx"], + "val": [1.0, 2.0, 3.0], + "se": [0.1, 0.2, 0.3], + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3 + }) + signal_name = "test" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records") + expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + + # Verify that files were archived + path = data_dir + f'/archive/successful/src-name/20200419_state_test.csv.gz' + self.assertIsNotNone(os.stat(path)) - # request CSV data from the API - response = Epidata.covidcast( - 'src-name', 'test', 'day', 'state', 20200419, '*') + self.tearDown() + self.setUp() + + + with self.subTest("Valid CSV with no missing columns should set intelligent defaults"): + values = pd.DataFrame({ + "geo_id": ["ca", "fl", "tx"], + "val": [None, 2.0, 3.0], + "se": [0.1, None, 0.3], + "sample_size": [10.0, 20.0, None] + }, dtype=object) + signal_name = "test_no_missing_cols" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_values = pd.concat([values, pd.DataFrame({ + "time_value": [20200419] * 3, + "signal": [signal_name] * 3, + "direction": [None] * 3, + "missing_value": [Nans.OTHER] + [Nans.NOT_MISSING] * 2, + "missing_stderr": [Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 2 + [Nans.OTHER] + })], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records") + expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + + self.tearDown() + self.setUp() + + + with self.subTest("Invalid, missing with an inf value"): + values = pd.DataFrame({ + "geo_id": ["tx"], + "val": [np.inf], + "se": [0.3], + "sample_size": [None], + "missing_value": [Nans.OTHER], + "missing_stderr": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] + }) + signal_name = "test_with_inf" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_response = {'result': -2, 'message': 'no results'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + self.tearDown() + self.setUp() + + + with self.subTest("Valid, missing with incorrect missing codes, fixed by acquisition"): + values = pd.DataFrame({ + "geo_id": ["tx"], + "val": [None], + "se": [0.3], + "sample_size": [30.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.OTHER] + }).replace({np.nan:None}) + signal_name = "test_incorrect_missing_codes" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_values_df = pd.concat([values, pd.DataFrame({ + "time_value": [20200419], + "signal": [signal_name], + "direction": [None]})], axis=1).rename(columns=uploader_column_rename) + expected_values_df["missing_value"].iloc[0] = Nans.OTHER + expected_values_df["missing_sample_size"].iloc[0] = Nans.NOT_MISSING + expected_values = expected_values_df.to_dict(orient="records") + expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + + self.tearDown() + self.setUp() + + + with self.subTest("Valid wip"): + values = pd.DataFrame({ + "geo_id": ["me", "nd", "wa"], + "val": [10.0, 20.0, 30.0], + "se": [0.01, 0.02, 0.03], + "sample_size": [100.0, 200.0, 300.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3 + }) + signal_name = "wip_prototype" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_values = pd.concat([values, pd.DataFrame({ + "time_value": [20200419] * 3, + "signal": [signal_name] * 3, + "direction": [None] * 3 + })], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records") + expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + + # Verify that files were archived + path = data_dir + f'/archive/successful/src-name/20200419_state_wip_prototype.csv.gz' + self.assertIsNotNone(os.stat(path)) + self.tearDown() + self.setUp() - expected_issue_day=date.today() - expected_issue=expected_issue_day.strftime("%Y%m%d") - def apply_lag(expected_epidata): - for dct in expected_epidata: - dct['issue'] = int(expected_issue) - time_value_day = date(year=dct['time_value'] // 10000, - month=dct['time_value'] % 10000 // 100, - day= dct['time_value'] % 100) - expected_lag = (expected_issue_day - time_value_day).days - dct['lag'] = expected_lag - return expected_epidata - - # verify data matches the CSV - # NB these are ordered by geo_value - self.assertEqual(response, { - 'result': 1, - 'epidata': apply_lag([ - { - 'time_value': 20200419, - 'geo_value': 'ca', - 'value': 1, - 'stderr': 0.1, - 'sample_size': 10, - 'direction': None, - 'signal': 'test', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'fl', - 'value': 3, - 'stderr': 0.3, - 'sample_size': 30, - 'direction': None, - 'signal': 'test', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'tx', - 'value': 2, - 'stderr': 0.2, - 'sample_size': 20, - 'direction': None, - 'signal': 'test', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - ]), - 'message': 'success', - }) - - # request CSV data from the API on the test with missing values - response = Epidata.covidcast( - 'src-name', 'test_no_missing', 'day', 'state', 20200419, '*') - - # verify data matches the CSV - # NB these are ordered by geo_value - self.assertEqual(response, { - 'result': 1, - 'epidata': apply_lag([ - { - 'time_value': 20200419, - 'geo_value': 'ca', - 'value': 1, - 'stderr': 0.1, - 'sample_size': 10, - 'direction': None, - 'signal': 'test_no_missing', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'tx', - 'value': None, - 'stderr': 0.2, - 'sample_size': 20, - 'direction': None, - 'signal': 'test_no_missing', - 'missing_value': Nans.OTHER, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'wa', - 'value': 3, - 'stderr': 0.3, - 'sample_size': 30, - 'direction': None, - 'signal': 'test_no_missing', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - ]), - 'message': 'success', - }) - - # invalid missing files - response = Epidata.covidcast( - 'src-name', 'test_missing1', 'day', 'state', 20200419, '*') - self.assertEqual(response, { - 'result': -2, - 'message': 'no results', - }) - response = Epidata.covidcast( - 'src-name', 'test_missing2', 'day', 'state', 20200419, '*') - self.assertEqual(response, { - 'result': -2, - 'message': 'no results', - }) - response = Epidata.covidcast( - 'src-name', 'test_missing3', 'day', 'state', 20200419, '*') - self.assertEqual(response, { - 'result': -2, - 'message': 'no results', - }) - - # request CSV data from the API on WIP signal - response = Epidata.covidcast( - 'src-name', 'wip_prototype', 'day', 'state', 20200419, '*') - - # verify data matches the CSV - # NB these are ordered by geo_value - self.assertEqual(response, { - 'result': 1, - 'epidata': apply_lag([ - { - 'time_value': 20200419, - 'geo_value': 'me', - 'value': 10, - 'stderr': 0.01, - 'sample_size': 100, - 'direction': None, - 'signal': 'wip_prototype', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'nd', - 'value': 20, - 'stderr': 0.02, - 'sample_size': 200, - 'direction': None, - 'signal': 'wip_prototype', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'wa', - 'value': 30, - 'stderr': 0.03, - 'sample_size': 300, - 'direction': None, - 'signal': 'wip_prototype', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - ]), - 'message': 'success', - }) - - # request CSV data from the API on the signal with name length 320) @@ -159,22 +159,22 @@ def test_extract_and_check_row(self): def make_row( geo_type='state', geo_id='vi', - val='1.23', - se='4.56', + value='1.23', + stderr='4.56', sample_size='100.5', - missing_val=Nans.NOT_MISSING, - missing_se=Nans.NOT_MISSING, - missing_sample_size=Nans.NOT_MISSING): + missing_value=str(float(Nans.NOT_MISSING)), + missing_stderr=str(float(Nans.NOT_MISSING)), + missing_sample_size=str(float(Nans.NOT_MISSING))): row = MagicMock( geo_id=geo_id, - val=val, - se=se, + value=value, + stderr=stderr, sample_size=sample_size, - missing_val=missing_val, - missing_se=missing_se, + missing_value=missing_value, + missing_stderr=missing_stderr, missing_sample_size=missing_sample_size, - spec=["geo_id", "val", "se", "sample_size", - "missing_val", "missing_se", "missing_sample_size"]) + spec=["geo_id", "value", "stderr", "sample_size", + "missing_value", "missing_stderr", "missing_sample_size"]) return geo_type, row # cases to test each failure mode @@ -190,22 +190,16 @@ def make_row( (make_row(geo_type='nation', geo_id='0000'), 'geo_id'), (make_row(geo_type='hhs', geo_id='0'), 'geo_id'), (make_row(geo_type='province', geo_id='ab'), 'geo_type'), - (make_row(se='-1'), 'se'), + (make_row(stderr='-1'), 'stderr'), (make_row(geo_type=None), 'geo_type'), (make_row(geo_id=None), 'geo_id'), - (make_row(val='inf'), 'val'), - (make_row(se='inf'), 'se'), + (make_row(value='inf'), 'value'), + (make_row(stderr='inf'), 'stderr'), (make_row(sample_size='inf'), 'sample_size'), (make_row(geo_type='hrr', geo_id='hrr001'), 'geo_id'), - (make_row(val='val'), 'val'), - (make_row(se='se'), 'se'), + (make_row(value='value'), 'value'), + (make_row(stderr='stderr'), 'stderr'), (make_row(sample_size='sample_size'), 'sample_size'), - (make_row(missing_val='missing_val'), 'missing_val'), - (make_row(missing_se='missing_val'), 'missing_se'), - (make_row(missing_sample_size='missing_val'), 'missing_sample_size'), - (make_row(val='1.2', missing_val=Nans.OTHER), 'missing_val'), - (make_row(se='1.2', missing_se=Nans.OTHER), 'missing_se'), - (make_row(sample_size='1.2', missing_sample_size=Nans.OTHER), 'missing_sample_size') ] for ((geo_type, row), field) in failure_cases: @@ -213,30 +207,21 @@ def make_row( self.assertIsNone(values) self.assertEqual(error, field) - # a nominal case without missing values - geo_type, row = make_row() - values, error = CsvImporter.extract_and_check_row(row, geo_type) - - self.assertIsInstance(values, CsvImporter.RowValues) - self.assertEqual(str(values.geo_value), row.geo_id) - self.assertEqual(str(values.value), row.val) - self.assertEqual(str(values.stderr), row.se) - self.assertEqual(str(values.sample_size), row.sample_size) - self.assertIsNone(error) - - # a nominal case with missing values - geo_type, row = make_row( - se='', sample_size='NA', - missing_se=Nans.OTHER, missing_sample_size=Nans.OTHER - ) - values, error = CsvImporter.extract_and_check_row(row, geo_type) - - self.assertIsInstance(values, CsvImporter.RowValues) - self.assertEqual(str(values.geo_value), row.geo_id) - self.assertEqual(str(values.value), row.val) - self.assertIsNone(values.stderr) - self.assertIsNone(values.sample_size) - self.assertIsNone(error) + success_cases = [ + (make_row(), CsvImporter.RowValues('vi', 1.23, 4.56, 100.5, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING)), + (make_row(value=None, stderr=np.nan, sample_size='', missing_value=str(float(Nans.DELETED)), missing_stderr=str(float(Nans.DELETED)), missing_sample_size=str(float(Nans.DELETED))), CsvImporter.RowValues('vi', None, None, None, Nans.DELETED, Nans.DELETED, Nans.DELETED)), + (make_row(stderr='', sample_size='NA', missing_stderr=str(float(Nans.OTHER)), missing_sample_size=str(float(Nans.OTHER))), CsvImporter.RowValues('vi', 1.23, None, None, Nans.NOT_MISSING, Nans.OTHER, Nans.OTHER)), + (make_row(sample_size=None, missing_value='missing_value', missing_stderr=str(float(Nans.OTHER)), missing_sample_size=str(float(Nans.NOT_MISSING))), CsvImporter.RowValues('vi', 1.23, 4.56, None, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.OTHER)), + ] + + for ((geo_type, row), field) in success_cases: + values, error = CsvImporter.extract_and_check_row(row, geo_type) + self.assertIsNone(error) + self.assertIsInstance(values, CsvImporter.RowValues) + self.assertEqual(values.geo_value, field.geo_value) + self.assertEqual(values.value, field.value) + self.assertEqual(values.stderr, field.stderr) + self.assertEqual(values.sample_size, field.sample_size) def test_load_csv_with_invalid_header(self): """Bail loading a CSV when the header is invalid.""" @@ -291,16 +276,14 @@ def test_load_csv_with_valid_header(self): self.assertIsNone(rows[3]) - # now with missing values! the last missing_sample_size - # contains an error code while data is available, which - # should give an error + # now with missing values! data = { 'geo_id': ['ca', 'tx', 'fl', 'ak'], 'val': [np.nan, '1.2', '1.3', '1.4'], 'se': ['2.1', "na", '2.3', '2.4'], 'sample_size': ['301', '302', None, '304'], - 'missing_val': [Nans.NOT_APPLICABLE] + [Nans.NOT_MISSING] * 3, - 'missing_se': [Nans.NOT_MISSING, Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.NOT_MISSING], + 'missing_value': [Nans.NOT_APPLICABLE] + [Nans.NOT_MISSING] * 3, + 'missing_stderr': [Nans.NOT_MISSING, Nans.REGION_EXCEPTION, Nans.NOT_MISSING, Nans.NOT_MISSING], 'missing_sample_size': [Nans.NOT_MISSING] * 2 + [Nans.REGION_EXCEPTION] * 2 } mock_pandas = MagicMock() @@ -338,4 +321,10 @@ def test_load_csv_with_valid_header(self): self.assertEqual(rows[2].missing_stderr, Nans.NOT_MISSING) self.assertEqual(rows[2].missing_sample_size, Nans.REGION_EXCEPTION) - self.assertIsNone(rows[3]) + self.assertEqual(rows[3].geo_value, 'ak') + self.assertEqual(rows[3].value, 1.4) + self.assertEqual(rows[3].stderr, 2.4) + self.assertEqual(rows[3].sample_size, 304) + self.assertEqual(rows[3].missing_value, Nans.NOT_MISSING) + self.assertEqual(rows[3].missing_stderr, Nans.NOT_MISSING) + self.assertEqual(rows[3].missing_sample_size, Nans.NOT_MISSING)