From c9d7045031a724150e35a916402eed5925a624dd Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Mon, 4 Oct 2021 12:47:33 -0700 Subject: [PATCH 1/4] Hotfix acquisition: fix dtypes of missing columns * improve test coverage for this case --- src/acquisition/covidcast/csv_importer.py | 2 +- .../covidcast/test_csv_importer.py | 52 ++++++++----------- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index f55327d85..156474290 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -224,7 +224,7 @@ def validate_missing_code(row, attr_quantity, attr_name): if hasattr(row, "missing_" + attr_name): missing_entry = getattr(row, "missing_" + attr_name) try: - missing_entry = int(missing_entry) + missing_entry = int(float(missing_entry)) # convert from string to float to int except ValueError: return None # A missing code should never contradict the quantity being present, diff --git a/tests/acquisition/covidcast/test_csv_importer.py b/tests/acquisition/covidcast/test_csv_importer.py index b82dec1ce..6ecf292e6 100644 --- a/tests/acquisition/covidcast/test_csv_importer.py +++ b/tests/acquisition/covidcast/test_csv_importer.py @@ -59,7 +59,7 @@ def test_find_issue_specific_csv_files(self,os_isdir_mock): issuedir_match= CsvImporter.PATTERN_ISSUE_DIR.match(path_prefix.lower()) issue_date_value = int(issuedir_match.group(2)) self.assertTrue(CsvImporter.is_sane_day(issue_date_value)) - + found = set(CsvImporter.find_issue_specific_csv_files(path_prefix, glob=mock_glob)) self.assertTrue(len(found)>0) @@ -162,9 +162,9 @@ def make_row( val='1.23', se='4.56', sample_size='100.5', - missing_val=Nans.NOT_MISSING, - missing_se=Nans.NOT_MISSING, - missing_sample_size=Nans.NOT_MISSING): + missing_val=str(float(Nans.NOT_MISSING)), + missing_se=str(float(Nans.NOT_MISSING)), + missing_sample_size=str(float(Nans.NOT_MISSING))): row = MagicMock( geo_id=geo_id, val=val, @@ -203,9 +203,9 @@ def make_row( (make_row(missing_val='missing_val'), 'missing_val'), (make_row(missing_se='missing_val'), 'missing_se'), (make_row(missing_sample_size='missing_val'), 'missing_sample_size'), - (make_row(val='1.2', missing_val=Nans.OTHER), 'missing_val'), - (make_row(se='1.2', missing_se=Nans.OTHER), 'missing_se'), - (make_row(sample_size='1.2', missing_sample_size=Nans.OTHER), 'missing_sample_size') + (make_row(val='1.2', missing_val=str(float(Nans.OTHER))), 'missing_val'), + (make_row(se='1.2', missing_se=str(float(Nans.OTHER))), 'missing_se'), + (make_row(sample_size='1.2', missing_sample_size=str(float(Nans.OTHER))), 'missing_sample_size'), ] for ((geo_type, row), field) in failure_cases: @@ -213,30 +213,20 @@ def make_row( self.assertIsNone(values) self.assertEqual(error, field) - # a nominal case without missing values - geo_type, row = make_row() - values, error = CsvImporter.extract_and_check_row(row, geo_type) - - self.assertIsInstance(values, CsvImporter.RowValues) - self.assertEqual(str(values.geo_value), row.geo_id) - self.assertEqual(str(values.value), row.val) - self.assertEqual(str(values.stderr), row.se) - self.assertEqual(str(values.sample_size), row.sample_size) - self.assertIsNone(error) - - # a nominal case with missing values - geo_type, row = make_row( - se='', sample_size='NA', - missing_se=Nans.OTHER, missing_sample_size=Nans.OTHER - ) - values, error = CsvImporter.extract_and_check_row(row, geo_type) - - self.assertIsInstance(values, CsvImporter.RowValues) - self.assertEqual(str(values.geo_value), row.geo_id) - self.assertEqual(str(values.value), row.val) - self.assertIsNone(values.stderr) - self.assertIsNone(values.sample_size) - self.assertIsNone(error) + success_cases = [ + (make_row(), CsvImporter.RowValues('vi', 1.23, 4.56, 100.5, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING)), + (make_row(geo_type='county', geo_id='17000', val=np.nan, se=np.nan, sample_size=np.nan, missing_val=str(float(Nans.DELETED)), missing_se=str(float(Nans.DELETED)), missing_sample_size=str(float(Nans.DELETED))), CsvImporter.RowValues('17000', None, None, None, Nans.DELETED, Nans.DELETED, Nans.DELETED)), + (make_row(se='', sample_size='NA', missing_se=str(float(Nans.OTHER)), missing_sample_size=str(float(Nans.OTHER))), CsvImporter.RowValues('vi', 1.23, None, None, Nans.NOT_MISSING, Nans.OTHER, Nans.OTHER)) + ] + + for ((geo_type, row), field) in success_cases: + values, error = CsvImporter.extract_and_check_row(row, geo_type) + self.assertIsNone(error) + self.assertIsInstance(values, CsvImporter.RowValues) + self.assertEqual(values.geo_value, field.geo_value) + self.assertEqual(values.value, field.value) + self.assertEqual(values.stderr, field.stderr) + self.assertEqual(values.sample_size, field.sample_size) def test_load_csv_with_invalid_header(self): """Bail loading a CSV when the header is invalid.""" From cecb6c950e2e2f189d4b901557d9c54833eb85e6 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Mon, 4 Oct 2021 12:52:28 -0700 Subject: [PATCH 2/4] Hotfix acquisition: improve test coverage --- tests/acquisition/covidcast/test_csv_importer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/acquisition/covidcast/test_csv_importer.py b/tests/acquisition/covidcast/test_csv_importer.py index 6ecf292e6..4b87cf68b 100644 --- a/tests/acquisition/covidcast/test_csv_importer.py +++ b/tests/acquisition/covidcast/test_csv_importer.py @@ -215,7 +215,7 @@ def make_row( success_cases = [ (make_row(), CsvImporter.RowValues('vi', 1.23, 4.56, 100.5, Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.NOT_MISSING)), - (make_row(geo_type='county', geo_id='17000', val=np.nan, se=np.nan, sample_size=np.nan, missing_val=str(float(Nans.DELETED)), missing_se=str(float(Nans.DELETED)), missing_sample_size=str(float(Nans.DELETED))), CsvImporter.RowValues('17000', None, None, None, Nans.DELETED, Nans.DELETED, Nans.DELETED)), + (make_row(val=None, se=np.nan, sample_size='', missing_val=str(float(Nans.DELETED)), missing_se=str(float(Nans.DELETED)), missing_sample_size=str(float(Nans.DELETED))), CsvImporter.RowValues('vi', None, None, None, Nans.DELETED, Nans.DELETED, Nans.DELETED)), (make_row(se='', sample_size='NA', missing_se=str(float(Nans.OTHER)), missing_sample_size=str(float(Nans.OTHER))), CsvImporter.RowValues('vi', 1.23, None, None, Nans.NOT_MISSING, Nans.OTHER, Nans.OTHER)) ] From 5d9aee7ae2bb387913e4ab5e5423270aa01df4fa Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Mon, 4 Oct 2021 14:08:55 -0700 Subject: [PATCH 3/4] Acquisition logging: * repair and log instead of rejecting files with invalid missing codes * introduce dtype inference code in acquisition to see if further refactors possible * refactor test_csv_uploading for readability --- .../covidcast/test_csv_uploading.py | 551 +++++++++--------- src/acquisition/covidcast/csv_importer.py | 83 +-- .../covidcast/test_csv_importer.py | 57 +- 3 files changed, 333 insertions(+), 358 deletions(-) diff --git a/integrations/acquisition/covidcast/test_csv_uploading.py b/integrations/acquisition/covidcast/test_csv_uploading.py index 397dc0a73..bb65e10d8 100644 --- a/integrations/acquisition/covidcast/test_csv_uploading.py +++ b/integrations/acquisition/covidcast/test_csv_uploading.py @@ -8,6 +8,8 @@ # third party import mysql.connector +import pandas as pd +import numpy as np # first party from delphi_utils import Nans @@ -52,6 +54,26 @@ def tearDown(self): self.cur.close() self.cnx.close() + @staticmethod + def apply_lag(expected_epidata): + expected_issue_day=date.today() + expected_issue=expected_issue_day.strftime("%Y%m%d") + for dct in expected_epidata: + dct['issue'] = int(expected_issue) + time_value_day = date(year=dct['time_value'] // 10000, + month=dct['time_value'] % 10000 // 100, + day= dct['time_value'] % 100) + expected_lag = (expected_issue_day - time_value_day).days + dct['lag'] = expected_lag + return expected_epidata + + def verify_timestamps_and_defaults(self): + self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast') + for value_updated_timestamp, direction_updated_timestamp, direction in self.cur: + self.assertGreater(value_updated_timestamp, 0) + self.assertEqual(direction_updated_timestamp, 0) + self.assertIsNone(direction) + def test_uploading(self): """Scan, parse, upload, archive, serve, and fetch a covidcast signal.""" @@ -64,299 +86,250 @@ def test_uploading(self): log_file_directory = "/var/log/" os.makedirs(source_receiving_dir, exist_ok=True) os.makedirs(log_file_directory, exist_ok=True) - - # valid - with open(source_receiving_dir + '/20200419_state_test.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'ca,1,0.1,10,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - f.write(f'tx,2,0.2,20,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - f.write(f'fl,3,0.3,30,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # valid, old style no missing cols should have intelligent defaults - with open(source_receiving_dir + '/20200419_state_test_no_missing.csv', 'w') as f: - f.write('geo_id,val,se,sample_size\n') - f.write('ca,1,0.1,10\n') - f.write('tx,NA,0.2,20\n') - f.write('wa,3,0.3,30\n') - - # invalid, missing with an inf value - with open(source_receiving_dir + '/20200419_state_test_missing1.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'fl,inf,0.3,30,{Nans.OTHER},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # invalid, missing with an incorrect missing code - with open(source_receiving_dir + '/20200419_state_test_missing2.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'tx,NA,0.2,20,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # invalid, no missing with an incorrect missing code - with open(source_receiving_dir + '/20200419_state_test_missing3.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'wa,3,0.3,30,{Nans.OTHER},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # valid wip - with open(source_receiving_dir + '/20200419_state_wip_prototype.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'me,10,0.01,100,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - f.write(f'nd,20,0.02,200,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - f.write(f'wa,30,0.03,300,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # invalid - with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_be_accepted.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'pa,100,5.4,624,{Nans.NOT_MISSING},{Nans.NOT_MISSING},{Nans.NOT_MISSING}\n') - - # invalid - with open(source_receiving_dir + '/20200419_state_wip_really_long_name_that_will_get_truncated_lorem_ipsum_dolor_sit_amet.csv', 'w') as f: - f.write('geo_id,val,se,sample_size,missing_val,missing_se,missing_sample_size\n') - f.write(f'pa,100,5.4,624,{Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}\n') - - # invalid - with open(source_receiving_dir + '/20200420_state_test.csv', 'w') as f: - f.write('this,header,is,wrong\n') - - # invalid - with open(source_receiving_dir + '/hello.csv', 'w') as f: - f.write('file name is wrong\n') - - # upload CSVs # TODO: use an actual argparse object for the args instead of a MagicMock args = MagicMock( - log_file=log_file_directory + - "output.log", - data_dir=data_dir, - is_wip_override=False, - not_wip_override=False, - specific_issue_date=False) - main(args) + log_file=log_file_directory + + "output.log", + data_dir=data_dir, + is_wip_override=False, + not_wip_override=False, + specific_issue_date=False) + uploader_column_rename = {"geo_id": "geo_value", "val": "value", "se": "stderr", "missing_val": "missing_value", "missing_se": "missing_stderr"} + + + with self.subTest("Valid CSV with correct missing columns"): + values = pd.DataFrame({ + "geo_id": ["ca", "fl", "tx"], + "val": [1.0, 2.0, 3.0], + "se": [0.1, 0.2, 0.3], + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3 + }) + signal_name = "test" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records") + expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + + # Verify that files were archived + path = data_dir + f'/archive/successful/src-name/20200419_state_test.csv.gz' + self.assertIsNotNone(os.stat(path)) - # request CSV data from the API - response = Epidata.covidcast( - 'src-name', 'test', 'day', 'state', 20200419, '*') + self.tearDown() + self.setUp() + + + with self.subTest("Valid CSV with no missing columns should set intelligent defaults"): + values = pd.DataFrame({ + "geo_id": ["ca", "fl", "tx"], + "val": [None, 2.0, 3.0], + "se": [0.1, None, 0.3], + "sample_size": [10.0, 20.0, None] + }, dtype=object) + signal_name = "test_no_missing_cols" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_values = pd.concat([values, pd.DataFrame({ + "time_value": [20200419] * 3, + "signal": [signal_name] * 3, + "direction": [None] * 3, + "missing_value": [Nans.OTHER] + [Nans.NOT_MISSING] * 2, + "missing_stderr": [Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 2 + [Nans.OTHER] + })], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records") + expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + + self.tearDown() + self.setUp() + + + with self.subTest("Invalid, missing with an inf value"): + values = pd.DataFrame({ + "geo_id": ["tx"], + "val": [np.inf], + "se": [0.3], + "sample_size": [None], + "missing_value": [Nans.OTHER], + "missing_stderr": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] + }) + signal_name = "test_with_inf" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_response = {'result': -2, 'message': 'no results'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + self.tearDown() + self.setUp() + + + with self.subTest("Valid, missing with incorrect missing codes, fixed by acquisition"): + values = pd.DataFrame({ + "geo_id": ["tx"], + "val": [None], + "se": [0.3], + "sample_size": [30.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.OTHER] + }).replace({np.nan:None}) + signal_name = "test_incorrect_missing_codes" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_values_df = pd.concat([values, pd.DataFrame({ + "time_value": [20200419], + "signal": [signal_name], + "direction": [None]})], axis=1).rename(columns=uploader_column_rename) + expected_values_df["missing_value"].iloc[0] = Nans.OTHER + expected_values_df["missing_sample_size"].iloc[0] = Nans.NOT_MISSING + expected_values = expected_values_df.to_dict(orient="records") + expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + + self.tearDown() + self.setUp() + + + with self.subTest("Valid wip"): + values = pd.DataFrame({ + "geo_id": ["me", "nd", "wa"], + "val": [10.0, 20.0, 30.0], + "se": [0.01, 0.02, 0.03], + "sample_size": [100.0, 200.0, 300.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3 + }) + signal_name = "wip_prototype" + values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False) + + # upload CSVs + main(args) + response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*') + + expected_values = pd.concat([values, pd.DataFrame({ + "time_value": [20200419] * 3, + "signal": [signal_name] * 3, + "direction": [None] * 3 + })], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records") + expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'} + + self.assertEqual(response, expected_response) + self.verify_timestamps_and_defaults() + + # Verify that files were archived + path = data_dir + f'/archive/successful/src-name/20200419_state_wip_prototype.csv.gz' + self.assertIsNotNone(os.stat(path)) + self.tearDown() + self.setUp() - expected_issue_day=date.today() - expected_issue=expected_issue_day.strftime("%Y%m%d") - def apply_lag(expected_epidata): - for dct in expected_epidata: - dct['issue'] = int(expected_issue) - time_value_day = date(year=dct['time_value'] // 10000, - month=dct['time_value'] % 10000 // 100, - day= dct['time_value'] % 100) - expected_lag = (expected_issue_day - time_value_day).days - dct['lag'] = expected_lag - return expected_epidata - - # verify data matches the CSV - # NB these are ordered by geo_value - self.assertEqual(response, { - 'result': 1, - 'epidata': apply_lag([ - { - 'time_value': 20200419, - 'geo_value': 'ca', - 'value': 1, - 'stderr': 0.1, - 'sample_size': 10, - 'direction': None, - 'signal': 'test', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'fl', - 'value': 3, - 'stderr': 0.3, - 'sample_size': 30, - 'direction': None, - 'signal': 'test', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'tx', - 'value': 2, - 'stderr': 0.2, - 'sample_size': 20, - 'direction': None, - 'signal': 'test', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - ]), - 'message': 'success', - }) - - # request CSV data from the API on the test with missing values - response = Epidata.covidcast( - 'src-name', 'test_no_missing', 'day', 'state', 20200419, '*') - - # verify data matches the CSV - # NB these are ordered by geo_value - self.assertEqual(response, { - 'result': 1, - 'epidata': apply_lag([ - { - 'time_value': 20200419, - 'geo_value': 'ca', - 'value': 1, - 'stderr': 0.1, - 'sample_size': 10, - 'direction': None, - 'signal': 'test_no_missing', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'tx', - 'value': None, - 'stderr': 0.2, - 'sample_size': 20, - 'direction': None, - 'signal': 'test_no_missing', - 'missing_value': Nans.OTHER, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'wa', - 'value': 3, - 'stderr': 0.3, - 'sample_size': 30, - 'direction': None, - 'signal': 'test_no_missing', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - ]), - 'message': 'success', - }) - - # invalid missing files - response = Epidata.covidcast( - 'src-name', 'test_missing1', 'day', 'state', 20200419, '*') - self.assertEqual(response, { - 'result': -2, - 'message': 'no results', - }) - response = Epidata.covidcast( - 'src-name', 'test_missing2', 'day', 'state', 20200419, '*') - self.assertEqual(response, { - 'result': -2, - 'message': 'no results', - }) - response = Epidata.covidcast( - 'src-name', 'test_missing3', 'day', 'state', 20200419, '*') - self.assertEqual(response, { - 'result': -2, - 'message': 'no results', - }) - - # request CSV data from the API on WIP signal - response = Epidata.covidcast( - 'src-name', 'wip_prototype', 'day', 'state', 20200419, '*') - - # verify data matches the CSV - # NB these are ordered by geo_value - self.assertEqual(response, { - 'result': 1, - 'epidata': apply_lag([ - { - 'time_value': 20200419, - 'geo_value': 'me', - 'value': 10, - 'stderr': 0.01, - 'sample_size': 100, - 'direction': None, - 'signal': 'wip_prototype', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'nd', - 'value': 20, - 'stderr': 0.02, - 'sample_size': 200, - 'direction': None, - 'signal': 'wip_prototype', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - { - 'time_value': 20200419, - 'geo_value': 'wa', - 'value': 30, - 'stderr': 0.03, - 'sample_size': 300, - 'direction': None, - 'signal': 'wip_prototype', - 'missing_value': Nans.NOT_MISSING, - 'missing_stderr': Nans.NOT_MISSING, - 'missing_sample_size': Nans.NOT_MISSING, - }, - ]), - 'message': 'success', - }) - - # request CSV data from the API on the signal with name length 32 Date: Tue, 5 Oct 2021 13:28:39 -0400 Subject: [PATCH 4/4] Pull dtype into a constant, read ints if you can, and lean on floaty_int if you can't. --- src/acquisition/covidcast/csv_importer.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index db06c71b6..05669cb82 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -38,6 +38,16 @@ class CsvImporter: MIN_YEAR = 2019 MAX_YEAR = 2030 + DTYPES = { + "geo_id": str, + "val": float, + "se": float, + "sample_size": float, + "missing_val": int, + "missing_se": int, + "missing_sample_size": int + } + # NOTE: this should be a Python 3.7+ `dataclass`, but the server is on 3.4 # See https://docs.python.org/3/library/dataclasses.html class RowValues: @@ -183,10 +193,9 @@ def floaty_int(value): """ float_value = float(value) - int_value = round(float_value) - if float_value != int_value: + if not float_value.is_integer(): raise ValueError('not an int: "%s"' % str(value)) - return int_value + return int(float_value) @staticmethod def maybe_apply(func, quantity): @@ -341,12 +350,10 @@ def load_csv(filepath, geo_type, pandas=pandas): logger = get_structured_logger('load_csv') try: - dtypes = {"geo_id": str, "val": float, "se": float, "sample_size": float, "missing_val": float, "missing_se": float, "missing_sample_size": float} - table = pandas.read_csv(filepath, dtype=dtypes) + table = pandas.read_csv(filepath, dtype=CsvImporter.DTYPES) except ValueError as e: logger.warning(event='Failed to open CSV with specified dtypes, switching to str', detail=str(e), file=filepath) - dtypes = {"geo_id": str, "val": str, "se": str, "sample_size": str, "missing_val": float, "missing_se": float, "missing_sample_size": float} - table = pandas.read_csv(filepath, dtype=dtypes) + table = pandas.read_csv(filepath, dtype='str') if not CsvImporter.is_header_valid(table.columns): logger.warning(event='invalid header', detail=table.columns, file=filepath)