From bd7f285168739b6e550b392e378565e41a09a0c6 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Sun, 24 Apr 2022 21:52:32 -0400 Subject: [PATCH 01/30] added integrations test for database for is_latest_issue added edge case added prettytable for better vizuals all tests passed --- dev/local/output.txt | 0 .../covidcast/test_covidcast_meta_caching.py | 7 +- .../covidcast/test_is_latest_issue.py | 346 ++++++++++++++++++ requirements.txt | 2 + 4 files changed, 354 insertions(+), 1 deletion(-) create mode 100644 dev/local/output.txt create mode 100644 integrations/acquisition/covidcast/test_is_latest_issue.py diff --git a/dev/local/output.txt b/dev/local/output.txt new file mode 100644 index 000000000..e69de29bb diff --git a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py index c18363c03..fe7007118 100644 --- a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py +++ b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py @@ -26,7 +26,7 @@ class CovidcastMetaCacheTests(unittest.TestCase): - """Tests covidcast metadata caching.""" + """Tests covidcast metadata caching. """ def setUp(self): """Perform per-test setup.""" @@ -94,7 +94,12 @@ def test_caching(self): # make sure the live utility is serving something sensible cvc_database = live.Database() cvc_database.connect() +<<<<<<< HEAD epidata1 = cvc_database.compute_covidcast_meta() +======= + epidata1 = cvc_database.compute_covidcast_meta() + #tests the regular interval (runs daily, computes for each source and signal and stores into epiddata1 +>>>>>>> 4ec6431a (added integrations test for database for is_latest_issue) cvc_database.disconnect(False) self.assertEqual(len(epidata1),1) self.assertEqual(epidata1, [ diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py new file mode 100644 index 000000000..757c451d1 --- /dev/null +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -0,0 +1,346 @@ +"""Integration tests for covidcast's is_latest_issue boolean.""" +# standard library +import unittest +import time +from unittest.mock import patch, MagicMock +from json import JSONDecodeError +import numpy as np +from math import ceil + +from queue import Queue, Empty +import threading +from multiprocessing import cpu_count + + +# third party +from aiohttp.client_exceptions import ClientResponseError +import mysql.connector +import pytest +from prettytable import PrettyTable, from_db_cursor + +# first party +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger +from delphi_utils import Nans +from delphi.epidata.client.delphi_epidata import Epidata +from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache +import delphi.operations.secrets as secrets + +# py3tester coverage target (equivalent to `import *`) +# +__test_target__ = 'delphi.epidata.acquisition.covidcast.database' + + +# use the local instance of the Epidata API +BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + +nmv = Nans.NOT_MISSING.value +class CovidcastLatestIssueTests(unittest.TestCase): + """Tests covidcast is_latest_issue caching.""" + + maxDiff = None #access full output of differences for debugging + + def setUp(self): + """Perform per-test setup.""" + + # connect to the `epidata` database + cnx = mysql.connector.connect( + user='user', + password='pass', + host='delphi_database_epidata', + database='covid') + cur = cnx.cursor() + + # clear all tables + cur.execute("truncate table signal_load") + cur.execute("truncate table signal_history") + cur.execute("truncate table signal_latest") + cur.execute("truncate table geo_dim") + cur.execute("truncate table signal_dim") + # reset the `covidcast_meta_cache` table (it should always have one row) + cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') + + cnx.commit() + cur.close() + + # make connection and cursor available to the Database object + self._db = Database() + self._db._connection = cnx + self._db._cursor = cnx.cursor() + + # use the local instance of the Epidata API + Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + + # use the local instance of the epidata database + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + def tearDown(self): + """Perform per-test teardown.""" + self._db._cursor.close() + self._db._connection.close() + + + def test_signal_latest(self): + # def __init__(source, signal, time_type, geo_type, time_value, geo_value, + # value, stderr, sample_size, missing_value, missing_stderr, missing_sample_size, + # issue, lag, is_wip): + rows = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', + 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'wa', + 3, 2, 1, nmv, nmv, nmv, 20200415, 0) + ] + self._db.insert_or_update_bulk(rows) + self._db.run_dbjobs() + #preview + self._db._cursor.execute('''SELECT * FROM `signal_history`''') + self.totalRows = len(list(self._db._cursor.fetchall())) + self._db._cursor.execute('''SELECT * FROM `signal_latest`''') + record = self._db._cursor.fetchall() + x = PrettyTable() + x.field_names = ['signal_data_id', + 'signal_key_id', + 'geo_key_id', + 'demog_key_id' , + 'issue' , + 'data_as_of_dt', + 'time_type', + 'time_value' , + 'reference_dt' , + 'value' , + 'stderr' , + 'sample_size', + 'lag' , + 'value_updated_timestamp' , + 'computation_as_of_dt'] + print("SIGNAL_LATEST TABLE") + for row in record: + x.add_row(list(row)[:len(x.field_names)]) + print(x) + print("Finish with 1st Set of Data") + + #sanity check for adding dummy data + sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414''' + self._db._cursor.execute(sql) + record = self._db._cursor.fetchall() + self.assertEqual(record[0][0], 20200414) + + #when uploading data patches (data in signal load has < issue than data in signal_latest) + #INSERT OLDER issue (does not end up in latest table) + #INSERT NEWER issue (ends up in latest table) + #when signal_load is older than signal_latest, we patch old data (i.e changed some old entries) + newRow = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', + 4.4, 4.4, 4.4, nmv, nmv, nmv, 20200416, 2)] #should show up + self._db.insert_or_update_bulk(newRow) + self._db.run_dbjobs() + + #check newer issue in signal_latest + sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414 ''' + self._db._cursor.execute(sql) + record = self._db._cursor.fetchall() + self.assertEqual(record[0][0], 20200416) #new data added + + updateRow = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', + 6.5, 2.2, 11.5, nmv, nmv, nmv, 20200414, 2)] #should not showup + self._db.insert_or_update_bulk(updateRow) + self._db.run_dbjobs() + + #check newer issue in signal_latest + sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414 ''' + self._db._cursor.execute(sql) + record2 = self._db._cursor.fetchall() + self.assertEqual(record, record2) #same as previous as is_latest did not change + + #dynamic check + sql2 = '''SELECT `issue` FROM `signal_history` where `time_value` ''' + self._db._cursor.execute(sql2) + record3 = self._db._cursor.fetchall() + self.assertEqual(3,self.totalRows + 1) #ensure 3 added (1 of which refreshed) + self.assertEqual(20200416,max(list(record3))[0]) #max of the outputs is 20200416 , extracting from tuple + + #check older issue not inside latest, empty field + sql = '''SELECT * FROM `signal_latest` where `time_value` = 20200414 and `issue` = 20200415 ''' + self._db._cursor.execute(sql) + emptyRecord = list(self._db._cursor.fetchall()) + empty = [] + self.assertEqual(empty, emptyRecord) + + # make sure rows are added to *_dim only when needed + #new src, sig (ensure that it is added into *_dim) + #old src, sig (ensure that it is NOT added into *_dim) + #new geo (added) + #old geo (not added) + def test_src_sig(self): + #BASE CASES + rows = [ + # CovidcastRow('src', 'sig', 'day', 'county', 20200416, 'wa', #not seen + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', # + 2, 2, 2, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200414, 'ca', # updating previous entry + 3, 3, 3, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_bulk(rows) + self._db.run_dbjobs() + #setting baseline variables + self._db._cursor.execute('''SELECT * FROM `geo_dim`''') + record = self._db._cursor.fetchall() + self.geoDimRows = len(list(record)) + + self._db._cursor.execute('''SELECT * FROM `signal_dim`''') + record = self._db._cursor.fetchall() + self.sigDimRows = len(list(record)) + + self._db._cursor.execute('''SELECT * FROM `signal_latest`''') + record = self._db._cursor.fetchall() + self.sigLatestRows = len(list(record)) + + #test not added first + with self.subTest(name='older src and sig not added into sig_dim'): + oldSrcSig = [ + CovidcastRow('src', 'sig', 'day', 'state', 20211111, 'pa', #new src, new sig + 99, 99, 99, nmv, nmv, nmv, 20211111, 1), + CovidcastRow('src', 'sig', 'day', 'county', 20211111, 'ca', #new src, new sig + 99, 99, 99, nmv, nmv, nmv, 20211111, 1) + ] + self._db.insert_or_update_bulk(oldSrcSig) + self._db.run_dbjobs() + + #testing src, sig + self._db._cursor.execute('''SELECT `source`, `signal` FROM `signal_dim`''') + record = self._db._cursor.fetchall() + res = [('src','sig')] #output + self.assertEqual(res , list(record)) + + #ensure new entries are added + sql = '''SELECT * FROM `signal_latest`''' + self._db._cursor.execute(sql) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)), self.sigLatestRows + 2) #2 original, 2 added + + #ensure nothing in geo + self._db._cursor.execute('''SELECT * FROM `geo_dim`''') + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),self.geoDimRows) + + with self.subTest(name='newer src and sig added in sig_dim'): + newSrcSig = [ + CovidcastRow('new_src', 'sig', 'day', 'state', 20200414, 'pa', # new_src + 2, 2, 2, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'new_sig', 'day', 'state', 20200414, 'pa', # new_sig + 2, 2, 2, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_bulk(newSrcSig) + self._db.run_dbjobs() + + #testing src, sig + self._db._cursor.execute('''SELECT `source`, `signal` FROM `signal_dim`''') + record = self._db._cursor.fetchall() + res = [('src', 'sig'), ('new_src', 'sig'), ('src', 'new_sig')] + self.assertEqual(res , (record)) + #ensure nothing in geo + self._db._cursor.execute('''SELECT * FROM `geo_dim`''') + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),self.geoDimRows) + + with self.subTest(name='old geo not added in geo_dim'): + repeatedGeoValues = [ #geo_type #geo_value + CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'pa', # same geo_type, geo_value + 2, 2, 2, nmv, nmv, nmv, 20200415, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200415, 'ca', # same geo_type, geo_value + 3, 3, 3, nmv, nmv, nmv, 20200415, 0), + ] + self._db.insert_or_update_bulk(repeatedGeoValues) + self._db.run_dbjobs() + + self._db._cursor.execute('''SELECT `geo_type`,`geo_value` FROM `geo_dim`''') + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),self.geoDimRows) #geoDimRows unchanged + + + self._db._cursor.execute('''SELECT * FROM `signal_latest`''') + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),self.sigLatestRows + 6) #total entries = 2(initial) + 6(test) + + with self.subTest(name='newer geo added in geo_dim'): + newGeoValues = [ #geo_type #geo_value + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'nj', # everything same except, state = nj + 2, 2, 2, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200414, 'al', # everything same except, county = al + 3, 3, 3, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200414, 'nj', # everything same except, county = nj + 3, 3, 3, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_bulk(newGeoValues) + self._db.run_dbjobs() + + self._db._cursor.execute('''SELECT `geo_type`,`geo_value` FROM `geo_dim`''') + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),self.geoDimRows + 3) #2 + 3 new + + self._db._cursor.execute('''SELECT * FROM `signal_latest`''') + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),self.sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) + + @unittest.skip("helper function") + def view_table(table_name): + #TODO: abstract into function + y = PrettyTable() + y.field_names = ['signal_data_id', + 'signal_key_id', + 'geo_key_id', + 'demog_key_id' , + 'issue' , + 'data_as_of_dt', + 'time_type', + 'time_value' , + 'reference_dt' , + 'value' , + 'stderr' , + 'sample_size', + 'lag' , + 'value_updated_timestamp' , + 'computation_as_of_dt'] + + print('\n') + self._db._cursor.execute('''SELECT * FROM `signal_latest`''') + record = self._db._cursor.fetchall() + print(record) + # assert(len(record) == ) + for row in record: + y.add_row(list(row)[:len(y.field_names)]) + print(y) + print("signal_latest updated") + # self._db._cursor.execute('''SELECT * FROM `signal_dim`''') + # record = self._db._cursor.fetchall() + # print(record) + # self._db._cursor.execute('''SELECT * FROM `geo_dim`''') + # record = self._db._cursor.fetchall() + # print(record) + + # sig_hist = PrettyTable() + # sig_hist.field_names = [('signal_data_id',), ('signal_key_id',), ('geo_key_id',), ('demog_key_id',), ('issue',), ('data_as_of_dt',), ('time_type',), ('time_value',), ('reference_dt',), ('value',), ('stderr',), ('sample_size',), ('lag',), ('value_updated_timestamp',), ('computation_as_of_dt',), ('is_latest_issue',), ('missing_value',), ('missing_stderr',), ('missing_sample_size',), ('legacy_id',)] + + # sig_load = PrettyTable() + # sig_load.field_names = [('signal_data_id',), ('signal_key_id',), ('geo_key_id',), ('demog_key_id',), ('issue',), ('data_as_of_dt',), ('source',), ('signal',), ('geo_type',), ('geo_value',), ('time_type',), ('time_value',), ('reference_dt',), ('value',), ('stderr',), ('sample_size',), ('lag',), ('value_updated_timestamp',), ('computation_as_of_dt',), ('is_latest_issue',), ('missing_value',), ('missing_stderr',), ('missing_sample_size',), ('legacy_id',), ('compressed_signal_key',), ('compressed_geo_key',), ('compressed_demog_key',), ('process_status',)] + + #when uploading data patches (data in signal load has < issue than data in signal_latest) + #when signal_load is older than signal_latest, we patch old data (i.e changed some old entries) + pass + + @unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice") + def test_diff_timevalue_issue(self): + rows = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', #updating old issue, should be seen in latest + 17, 17, 17, nmv, nmv, nmv, 20200417, 3), + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', # updating previous entry + 2, 2, 3, nmv, nmv, nmv, 20200416, 2) + ] + self._db.insert_or_update_bulk(rows) + self._db.run_dbjobs() + self._db._cursor.execute('''SELECT `issue` FROM `signal_latest` ''') + record = self._db._cursor.fetchall() + print(record[0][0]) + self.assertEqual(record[0][0], 20200417) #20200416 != 20200417 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f94c98dc3..932206678 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,5 @@ scipy==1.6.2 tenacity==7.0.0 newrelic epiweeks==2.1.2 +prettytable +# mysql-connector-python==8.0.17 From d789236d27f1ea23fc79f6cd9e761aa48b9cd0e6 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Mon, 6 Jun 2022 17:12:59 -0500 Subject: [PATCH 02/30] syntax error --- .../acquisition/covidcast/test_covidcast_meta_caching.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py index fe7007118..b1be8bcec 100644 --- a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py +++ b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py @@ -94,12 +94,8 @@ def test_caching(self): # make sure the live utility is serving something sensible cvc_database = live.Database() cvc_database.connect() -<<<<<<< HEAD - epidata1 = cvc_database.compute_covidcast_meta() -======= epidata1 = cvc_database.compute_covidcast_meta() #tests the regular interval (runs daily, computes for each source and signal and stores into epiddata1 ->>>>>>> 4ec6431a (added integrations test for database for is_latest_issue) cvc_database.disconnect(False) self.assertEqual(len(epidata1),1) self.assertEqual(epidata1, [ @@ -122,7 +118,7 @@ def test_caching(self): } ]) epidata1={'result':1, 'message':'success', 'epidata':epidata1} - + # make sure the API covidcast_meta is still blank, since it only serves # the cached version and we haven't cached anything yet epidata2 = Epidata.covidcast_meta() From c3f020578a15d8a9b2f6abdcd24140402f587884 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 8 Jun 2022 21:34:55 +0800 Subject: [PATCH 03/30] Update integrations/acquisition/covidcast/test_is_latest_issue.py Co-authored-by: melange396 --- integrations/acquisition/covidcast/test_is_latest_issue.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 757c451d1..5285010f4 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -82,9 +82,6 @@ def tearDown(self): def test_signal_latest(self): - # def __init__(source, signal, time_type, geo_type, time_value, geo_value, - # value, stderr, sample_size, missing_value, missing_stderr, missing_sample_size, - # issue, lag, is_wip): rows = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0), From 1f10a04ab33a9b6fd9d7c63f3b97773162e3d559 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 8 Jun 2022 21:35:16 +0800 Subject: [PATCH 04/30] Update integrations/acquisition/covidcast/test_is_latest_issue.py removed additional BASE_URL Co-authored-by: melange396 --- integrations/acquisition/covidcast/test_is_latest_issue.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 5285010f4..444b31a0a 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -31,9 +31,6 @@ __test_target__ = 'delphi.epidata.acquisition.covidcast.database' -# use the local instance of the Epidata API -BASE_URL = 'http://delphi_web_epidata/epidata/api.php' - nmv = Nans.NOT_MISSING.value class CovidcastLatestIssueTests(unittest.TestCase): """Tests covidcast is_latest_issue caching.""" From b241980575e0be884971e8b92adec0dce288c0fa Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 8 Jun 2022 21:35:31 +0800 Subject: [PATCH 05/30] Update requirements.txt remove unnecessary package Co-authored-by: melange396 --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 932206678..b406b1c58 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,3 @@ tenacity==7.0.0 newrelic epiweeks==2.1.2 prettytable -# mysql-connector-python==8.0.17 From 0a4e09e72f61042f56e2b57203999a5edeb42e0f Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 8 Jun 2022 08:50:57 -0500 Subject: [PATCH 06/30] added line to ignore output.txt --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 7daea21f0..9ddd8ff59 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ __pycache__/ /node_modules .mypy_cache /missing_db_signals.csv +/output.txt From f0f3abefd767556abb7bfcbc845903dbaa6bbe5e Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 8 Jun 2022 14:16:01 -0500 Subject: [PATCH 07/30] tidied up code and provided common SQL queries as variables --- .../covidcast/test_is_latest_issue.py | 124 ++++-------------- 1 file changed, 28 insertions(+), 96 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 444b31a0a..903262d52 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -27,16 +27,14 @@ import delphi.operations.secrets as secrets # py3tester coverage target (equivalent to `import *`) -# +# __test_target__ = 'delphi.epidata.acquisition.covidcast.database' - nmv = Nans.NOT_MISSING.value class CovidcastLatestIssueTests(unittest.TestCase): """Tests covidcast is_latest_issue caching.""" - maxDiff = None #access full output of differences for debugging - + def setUp(self): """Perform per-test setup.""" @@ -66,19 +64,24 @@ def setUp(self): self._db._cursor = cnx.cursor() # use the local instance of the Epidata API - Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + BASE_URL = 'http://delphi_web_epidata/epidata/api.php' # use the local instance of the epidata database secrets.db.host = 'delphi_database_epidata' secrets.db.epi = ('user', 'pass') + #Commonly used SQL commands: + self.viewSignalLatest = '''SELECT * FROM `signal_latest`''' + self.viewSignalHistory = '''SELECT * FROM `signal_history`''' + self.viewSignalDim = '''SELECT `source`, `signal` FROM `signal_dim`''' + self.viewGeoDim = '''SELECT `geo_type`,`geo_value` FROM `geo_dim`''' def tearDown(self): """Perform per-test teardown.""" self._db._cursor.close() self._db._connection.close() - def test_signal_latest(self): + rows = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0), @@ -90,29 +93,6 @@ def test_signal_latest(self): #preview self._db._cursor.execute('''SELECT * FROM `signal_history`''') self.totalRows = len(list(self._db._cursor.fetchall())) - self._db._cursor.execute('''SELECT * FROM `signal_latest`''') - record = self._db._cursor.fetchall() - x = PrettyTable() - x.field_names = ['signal_data_id', - 'signal_key_id', - 'geo_key_id', - 'demog_key_id' , - 'issue' , - 'data_as_of_dt', - 'time_type', - 'time_value' , - 'reference_dt' , - 'value' , - 'stderr' , - 'sample_size', - 'lag' , - 'value_updated_timestamp' , - 'computation_as_of_dt'] - print("SIGNAL_LATEST TABLE") - for row in record: - x.add_row(list(row)[:len(x.field_names)]) - print(x) - print("Finish with 1st Set of Data") #sanity check for adding dummy data sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414''' @@ -148,9 +128,8 @@ def test_signal_latest(self): record2 = self._db._cursor.fetchall() self.assertEqual(record, record2) #same as previous as is_latest did not change - #dynamic check - sql2 = '''SELECT `issue` FROM `signal_history` where `time_value` ''' - self._db._cursor.execute(sql2) + #dynamic check for signal_history + self._db._cursor.execute('''SELECT `issue` FROM `signal_history`''') record3 = self._db._cursor.fetchall() self.assertEqual(3,self.totalRows + 1) #ensure 3 added (1 of which refreshed) self.assertEqual(20200416,max(list(record3))[0]) #max of the outputs is 20200416 , extracting from tuple @@ -162,7 +141,7 @@ def test_signal_latest(self): empty = [] self.assertEqual(empty, emptyRecord) - # make sure rows are added to *_dim only when needed + # We want to test src_sig to make sure rows are added to *_dim only when needed #new src, sig (ensure that it is added into *_dim) #old src, sig (ensure that it is NOT added into *_dim) #new geo (added) @@ -170,7 +149,6 @@ def test_signal_latest(self): def test_src_sig(self): #BASE CASES rows = [ - # CovidcastRow('src', 'sig', 'day', 'county', 20200416, 'wa', #not seen CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', # 2, 2, 2, nmv, nmv, nmv, 20200414, 0), CovidcastRow('src', 'sig', 'day', 'county', 20200414, 'ca', # updating previous entry @@ -178,16 +156,15 @@ def test_src_sig(self): ] self._db.insert_or_update_bulk(rows) self._db.run_dbjobs() - #setting baseline variables - self._db._cursor.execute('''SELECT * FROM `geo_dim`''') + self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() self.geoDimRows = len(list(record)) - self._db._cursor.execute('''SELECT * FROM `signal_dim`''') + self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() self.sigDimRows = len(list(record)) - self._db._cursor.execute('''SELECT * FROM `signal_latest`''') + self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() self.sigLatestRows = len(list(record)) @@ -202,20 +179,19 @@ def test_src_sig(self): self._db.insert_or_update_bulk(oldSrcSig) self._db.run_dbjobs() - #testing src, sig - self._db._cursor.execute('''SELECT `source`, `signal` FROM `signal_dim`''') + #testing src, sig + self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() res = [('src','sig')] #output self.assertEqual(res , list(record)) - #ensure new entries are added - sql = '''SELECT * FROM `signal_latest`''' - self._db._cursor.execute(sql) + #ensure new entries are added in latest + self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() self.assertEqual(len(list(record)), self.sigLatestRows + 2) #2 original, 2 added #ensure nothing in geo - self._db._cursor.execute('''SELECT * FROM `geo_dim`''') + self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),self.geoDimRows) @@ -230,12 +206,12 @@ def test_src_sig(self): self._db.run_dbjobs() #testing src, sig - self._db._cursor.execute('''SELECT `source`, `signal` FROM `signal_dim`''') + self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() res = [('src', 'sig'), ('new_src', 'sig'), ('src', 'new_sig')] self.assertEqual(res , (record)) #ensure nothing in geo - self._db._cursor.execute('''SELECT * FROM `geo_dim`''') + self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),self.geoDimRows) @@ -249,12 +225,11 @@ def test_src_sig(self): self._db.insert_or_update_bulk(repeatedGeoValues) self._db.run_dbjobs() - self._db._cursor.execute('''SELECT `geo_type`,`geo_value` FROM `geo_dim`''') + self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),self.geoDimRows) #geoDimRows unchanged - - self._db._cursor.execute('''SELECT * FROM `signal_latest`''') + self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),self.sigLatestRows + 6) #total entries = 2(initial) + 6(test) @@ -274,55 +249,13 @@ def test_src_sig(self): record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),self.geoDimRows + 3) #2 + 3 new - self._db._cursor.execute('''SELECT * FROM `signal_latest`''') + self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) - - @unittest.skip("helper function") - def view_table(table_name): - #TODO: abstract into function - y = PrettyTable() - y.field_names = ['signal_data_id', - 'signal_key_id', - 'geo_key_id', - 'demog_key_id' , - 'issue' , - 'data_as_of_dt', - 'time_type', - 'time_value' , - 'reference_dt' , - 'value' , - 'stderr' , - 'sample_size', - 'lag' , - 'value_updated_timestamp' , - 'computation_as_of_dt'] - - print('\n') - self._db._cursor.execute('''SELECT * FROM `signal_latest`''') - record = self._db._cursor.fetchall() - print(record) - # assert(len(record) == ) - for row in record: - y.add_row(list(row)[:len(y.field_names)]) - print(y) - print("signal_latest updated") - # self._db._cursor.execute('''SELECT * FROM `signal_dim`''') - # record = self._db._cursor.fetchall() - # print(record) - # self._db._cursor.execute('''SELECT * FROM `geo_dim`''') - # record = self._db._cursor.fetchall() - # print(record) - - # sig_hist = PrettyTable() - # sig_hist.field_names = [('signal_data_id',), ('signal_key_id',), ('geo_key_id',), ('demog_key_id',), ('issue',), ('data_as_of_dt',), ('time_type',), ('time_value',), ('reference_dt',), ('value',), ('stderr',), ('sample_size',), ('lag',), ('value_updated_timestamp',), ('computation_as_of_dt',), ('is_latest_issue',), ('missing_value',), ('missing_stderr',), ('missing_sample_size',), ('legacy_id',)] - - # sig_load = PrettyTable() - # sig_load.field_names = [('signal_data_id',), ('signal_key_id',), ('geo_key_id',), ('demog_key_id',), ('issue',), ('data_as_of_dt',), ('source',), ('signal',), ('geo_type',), ('geo_value',), ('time_type',), ('time_value',), ('reference_dt',), ('value',), ('stderr',), ('sample_size',), ('lag',), ('value_updated_timestamp',), ('computation_as_of_dt',), ('is_latest_issue',), ('missing_value',), ('missing_stderr',), ('missing_sample_size',), ('legacy_id',), ('compressed_signal_key',), ('compressed_geo_key',), ('compressed_demog_key',), ('process_status',)] + self.assertEqual(len(list(record)),self.sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) #when uploading data patches (data in signal load has < issue than data in signal_latest) #when signal_load is older than signal_latest, we patch old data (i.e changed some old entries) - pass + @unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice") def test_diff_timevalue_issue(self): @@ -335,6 +268,5 @@ def test_diff_timevalue_issue(self): self._db.insert_or_update_bulk(rows) self._db.run_dbjobs() self._db._cursor.execute('''SELECT `issue` FROM `signal_latest` ''') - record = self._db._cursor.fetchall() - print(record[0][0]) + record = self._db._cursor.fetchall() self.assertEqual(record[0][0], 20200417) #20200416 != 20200417 \ No newline at end of file From a998d38fc4fa35868bf9d2eb127d80d1dcc23834 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 8 Jun 2022 14:20:05 -0500 Subject: [PATCH 08/30] removed unnecessary imports --- .../acquisition/covidcast/test_is_latest_issue.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 903262d52..6c3ca27a3 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -2,14 +2,8 @@ # standard library import unittest import time -from unittest.mock import patch, MagicMock -from json import JSONDecodeError -import numpy as np -from math import ceil -from queue import Queue, Empty import threading -from multiprocessing import cpu_count # third party @@ -252,9 +246,6 @@ def test_src_sig(self): self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),self.sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) - - #when uploading data patches (data in signal load has < issue than data in signal_latest) - #when signal_load is older than signal_latest, we patch old data (i.e changed some old entries) @unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice") From 035baf72d8a132a71e436d3d46481eba91533163 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 8 Jun 2022 14:21:17 -0500 Subject: [PATCH 09/30] added line to ignore output.txt --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 9ddd8ff59..b46e4112b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,4 @@ __pycache__/ /node_modules .mypy_cache /missing_db_signals.csv -/output.txt +/dev/local/output.txt \ No newline at end of file From 6d8d25ef828c92cb4a170162da0b1e0b6dd3b2f2 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Sat, 11 Jun 2022 14:13:48 -0500 Subject: [PATCH 10/30] removed unnecessary row for signal_latest, checked there is only one entry" --- .../acquisition/covidcast/test_is_latest_issue.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 6c3ca27a3..94b73aa18 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -2,7 +2,6 @@ # standard library import unittest import time - import threading @@ -78,9 +77,7 @@ def test_signal_latest(self): rows = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', - 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'wa', - 3, 2, 1, nmv, nmv, nmv, 20200415, 0) + 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0) ] self._db.insert_or_update_bulk(rows) self._db.run_dbjobs() @@ -93,6 +90,7 @@ def test_signal_latest(self): self._db._cursor.execute(sql) record = self._db._cursor.fetchall() self.assertEqual(record[0][0], 20200414) + self.assertEqual(len(record), 1) #check 1 entry #when uploading data patches (data in signal load has < issue than data in signal_latest) #INSERT OLDER issue (does not end up in latest table) @@ -109,6 +107,7 @@ def test_signal_latest(self): self._db._cursor.execute(sql) record = self._db._cursor.fetchall() self.assertEqual(record[0][0], 20200416) #new data added + self.assertEqual(len(record), 1) #check 1 entry updateRow = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', @@ -125,11 +124,11 @@ def test_signal_latest(self): #dynamic check for signal_history self._db._cursor.execute('''SELECT `issue` FROM `signal_history`''') record3 = self._db._cursor.fetchall() - self.assertEqual(3,self.totalRows + 1) #ensure 3 added (1 of which refreshed) + self.assertEqual(2,self.totalRows + 1) #ensure 3 added (1 of which refreshed) self.assertEqual(20200416,max(list(record3))[0]) #max of the outputs is 20200416 , extracting from tuple #check older issue not inside latest, empty field - sql = '''SELECT * FROM `signal_latest` where `time_value` = 20200414 and `issue` = 20200415 ''' + sql = '''SELECT * FROM `signal_latest` where `time_value` = 20200414 and `issue` = 20200414 ''' self._db._cursor.execute(sql) emptyRecord = list(self._db._cursor.fetchall()) empty = [] From 4090d4d78b43efc102b30faa41c84d9a4c307ada Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Sat, 11 Jun 2022 14:21:13 -0500 Subject: [PATCH 11/30] removed unnecessary self. --- .../covidcast/test_is_latest_issue.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 94b73aa18..76a04d22c 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -83,7 +83,7 @@ def test_signal_latest(self): self._db.run_dbjobs() #preview self._db._cursor.execute('''SELECT * FROM `signal_history`''') - self.totalRows = len(list(self._db._cursor.fetchall())) + totalRows = len(list(self._db._cursor.fetchall())) #sanity check for adding dummy data sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414''' @@ -124,7 +124,7 @@ def test_signal_latest(self): #dynamic check for signal_history self._db._cursor.execute('''SELECT `issue` FROM `signal_history`''') record3 = self._db._cursor.fetchall() - self.assertEqual(2,self.totalRows + 1) #ensure 3 added (1 of which refreshed) + self.assertEqual(2,totalRows + 1) #ensure 3 added (1 of which refreshed) self.assertEqual(20200416,max(list(record3))[0]) #max of the outputs is 20200416 , extracting from tuple #check older issue not inside latest, empty field @@ -151,15 +151,15 @@ def test_src_sig(self): self._db.run_dbjobs() self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.geoDimRows = len(list(record)) + geoDimRows = len(list(record)) self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() - self.sigDimRows = len(list(record)) + sigDimRows = len(list(record)) self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.sigLatestRows = len(list(record)) + sigLatestRows = len(list(record)) #test not added first with self.subTest(name='older src and sig not added into sig_dim'): @@ -181,12 +181,12 @@ def test_src_sig(self): #ensure new entries are added in latest self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)), self.sigLatestRows + 2) #2 original, 2 added + self.assertEqual(len(list(record)), sigLatestRows + 2) #2 original, 2 added #ensure nothing in geo self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.geoDimRows) + self.assertEqual(len(list(record)),geoDimRows) with self.subTest(name='newer src and sig added in sig_dim'): newSrcSig = [ @@ -206,7 +206,7 @@ def test_src_sig(self): #ensure nothing in geo self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.geoDimRows) + self.assertEqual(len(list(record)),geoDimRows) with self.subTest(name='old geo not added in geo_dim'): repeatedGeoValues = [ #geo_type #geo_value @@ -220,11 +220,11 @@ def test_src_sig(self): self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.geoDimRows) #geoDimRows unchanged + self.assertEqual(len(list(record)),geoDimRows) #geoDimRows unchanged self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.sigLatestRows + 6) #total entries = 2(initial) + 6(test) + self.assertEqual(len(list(record)),sigLatestRows + 6) #total entries = 2(initial) + 6(test) with self.subTest(name='newer geo added in geo_dim'): newGeoValues = [ #geo_type #geo_value @@ -240,11 +240,11 @@ def test_src_sig(self): self._db._cursor.execute('''SELECT `geo_type`,`geo_value` FROM `geo_dim`''') record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.geoDimRows + 3) #2 + 3 new + self.assertEqual(len(list(record)),geoDimRows + 3) #2 + 3 new self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) + self.assertEqual(len(list(record)),sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) @unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice") From 2e0546fa80c988f17d69c6ddaebaeb792ab383a5 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Sat, 11 Jun 2022 14:25:40 -0500 Subject: [PATCH 12/30] changed (county,ca) to (county,numeric) pair --- .../acquisition/covidcast/test_is_latest_issue.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 76a04d22c..49ec37f71 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -144,7 +144,7 @@ def test_src_sig(self): rows = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', # 2, 2, 2, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, 'ca', # updating previous entry + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', # updating previous entry 3, 3, 3, nmv, nmv, nmv, 20200414, 0) ] self._db.insert_or_update_bulk(rows) @@ -166,7 +166,7 @@ def test_src_sig(self): oldSrcSig = [ CovidcastRow('src', 'sig', 'day', 'state', 20211111, 'pa', #new src, new sig 99, 99, 99, nmv, nmv, nmv, 20211111, 1), - CovidcastRow('src', 'sig', 'day', 'county', 20211111, 'ca', #new src, new sig + CovidcastRow('src', 'sig', 'day', 'county', 20211111, '11111', #new src, new sig, same geo 99, 99, 99, nmv, nmv, nmv, 20211111, 1) ] self._db.insert_or_update_bulk(oldSrcSig) @@ -212,7 +212,7 @@ def test_src_sig(self): repeatedGeoValues = [ #geo_type #geo_value CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'pa', # same geo_type, geo_value 2, 2, 2, nmv, nmv, nmv, 20200415, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200415, 'ca', # same geo_type, geo_value + CovidcastRow('src', 'sig', 'day', 'county', 20200415, '11111', # same geo_type, geo_value 3, 3, 3, nmv, nmv, nmv, 20200415, 0), ] self._db.insert_or_update_bulk(repeatedGeoValues) @@ -230,9 +230,9 @@ def test_src_sig(self): newGeoValues = [ #geo_type #geo_value CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'nj', # everything same except, state = nj 2, 2, 2, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, 'al', # everything same except, county = al + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15217', # everything same except, county = al 3, 3, 3, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, 'nj', # everything same except, county = nj + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15451', # everything same except, county = nj 3, 3, 3, nmv, nmv, nmv, 20200414, 0) ] self._db.insert_or_update_bulk(newGeoValues) From 39ab3460e0548d9a5812e717ea924e01da1ef482 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Sun, 12 Jun 2022 09:43:10 +0800 Subject: [PATCH 13/30] Delete output.txt --- dev/local/output.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 dev/local/output.txt diff --git a/dev/local/output.txt b/dev/local/output.txt deleted file mode 100644 index e69de29bb..000000000 From db706ac8f1037a6ab2067644aaa399e910c46547 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Sat, 11 Jun 2022 20:51:41 -0500 Subject: [PATCH 14/30] showTable function WIP --- .../covidcast/test_is_latest_issue.py | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 49ec37f71..aaf3652f0 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -47,7 +47,6 @@ def setUp(self): cur.execute("truncate table signal_dim") # reset the `covidcast_meta_cache` table (it should always have one row) cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - cnx.commit() cur.close() @@ -81,7 +80,6 @@ def test_signal_latest(self): ] self._db.insert_or_update_bulk(rows) self._db.run_dbjobs() - #preview self._db._cursor.execute('''SELECT * FROM `signal_history`''') totalRows = len(list(self._db._cursor.fetchall())) @@ -246,7 +244,29 @@ def test_src_sig(self): record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) - + def showTable(self, table_name): + x = PrettyTable() + x.field_names = ['signal_data_id', + 'signal_key_id', + 'geo_key_id', + 'demog_key_id' , + 'issue' , + 'data_as_of_dt', + 'time_type', + 'time_value' , + 'reference_dt' , + 'value' , + 'stderr' , + 'sample_size', + 'lag' , + 'value_updated_timestamp' , + 'computation_as_of_dt'] + print("SIGNAL_LATEST TABLE") + for row in record: + x.add_row(list(row)[:len(x.field_names)]) + print(x) + print("Finish with 1st Set of Data") + @unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice") def test_diff_timevalue_issue(self): rows = [ From b9d712f37915e9c39f100c4e9acb08b1933996b7 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Thu, 16 Jun 2022 04:24:10 +0800 Subject: [PATCH 15/30] added Epidata for BASE_URL Co-authored-by: melange396 --- integrations/acquisition/covidcast/test_is_latest_issue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 49ec37f71..3b4aa812b 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -57,7 +57,7 @@ def setUp(self): self._db._cursor = cnx.cursor() # use the local instance of the Epidata API - BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' # use the local instance of the epidata database secrets.db.host = 'delphi_database_epidata' From d0b8d6972d5101dc3ef128ee4397b2c97b14442d Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 15 Jun 2022 15:26:30 -0500 Subject: [PATCH 16/30] tidied up comments and test cases --- integrations/acquisition/covidcast/test_is_latest_issue.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index aaf3652f0..c8d000c90 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -9,8 +9,7 @@ from aiohttp.client_exceptions import ClientResponseError import mysql.connector import pytest -from prettytable import PrettyTable, from_db_cursor - + # first party from delphi.epidata.acquisition.covidcast.logger import get_structured_logger from delphi_utils import Nans From 11a53162d7b125fd40f4cb5057e46006be388904 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 15 Jun 2022 15:48:40 -0500 Subject: [PATCH 17/30] changed all SQL queries to f-strings as better practice --- .../covidcast/test_is_latest_issue.py | 58 ++++++------------- 1 file changed, 17 insertions(+), 41 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index c8d000c90..dd9b62e7e 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -26,7 +26,6 @@ class CovidcastLatestIssueTests(unittest.TestCase): """Tests covidcast is_latest_issue caching.""" - def setUp(self): """Perform per-test setup.""" @@ -62,10 +61,10 @@ def setUp(self): secrets.db.epi = ('user', 'pass') #Commonly used SQL commands: - self.viewSignalLatest = '''SELECT * FROM `signal_latest`''' - self.viewSignalHistory = '''SELECT * FROM `signal_history`''' - self.viewSignalDim = '''SELECT `source`, `signal` FROM `signal_dim`''' - self.viewGeoDim = '''SELECT `geo_type`,`geo_value` FROM `geo_dim`''' + self.viewSignalLatest = f'SELECT * FROM {Database.latest_table}' + self.viewSignalHistory = f'SELECT * FROM {Database.history_table}' + self.viewSignalDim = f'SELECT `source`, `signal` FROM `signal_dim`' + self.viewGeoDim = f'SELECT `geo_type`,`geo_value` FROM `geo_dim`' def tearDown(self): """Perform per-test teardown.""" self._db._cursor.close() @@ -79,11 +78,11 @@ def test_signal_latest(self): ] self._db.insert_or_update_bulk(rows) self._db.run_dbjobs() - self._db._cursor.execute('''SELECT * FROM `signal_history`''') + self._db._cursor.execute(self.viewSignalHistory) totalRows = len(list(self._db._cursor.fetchall())) #sanity check for adding dummy data - sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414''' + sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414' self._db._cursor.execute(sql) record = self._db._cursor.fetchall() self.assertEqual(record[0][0], 20200414) @@ -100,7 +99,7 @@ def test_signal_latest(self): self._db.run_dbjobs() #check newer issue in signal_latest - sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414 ''' + sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414 ' self._db._cursor.execute(sql) record = self._db._cursor.fetchall() self.assertEqual(record[0][0], 20200416) #new data added @@ -113,19 +112,19 @@ def test_signal_latest(self): self._db.run_dbjobs() #check newer issue in signal_latest - sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414 ''' + sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414 ' self._db._cursor.execute(sql) record2 = self._db._cursor.fetchall() self.assertEqual(record, record2) #same as previous as is_latest did not change - #dynamic check for signal_history - self._db._cursor.execute('''SELECT `issue` FROM `signal_history`''') + #dynamic check for signal_history's list of issue + self._db._cursor.execute(f'SELECT `issue` FROM {Database.history_table}') record3 = self._db._cursor.fetchall() - self.assertEqual(2,totalRows + 1) #ensure 3 added (1 of which refreshed) - self.assertEqual(20200416,max(list(record3))[0]) #max of the outputs is 20200416 , extracting from tuple + self.assertEqual(len(record3),totalRows + 1) #ensure 3 added (1 of which refreshed) + self.assertEqual(20200416,max(record3)[0]) #max of the outputs is 20200416 , extracting from tuple #check older issue not inside latest, empty field - sql = '''SELECT * FROM `signal_latest` where `time_value` = 20200414 and `issue` = 20200414 ''' + sql = f'SELECT * FROM {Database.latest_table} where `time_value` = 20200414 and `issue` = 20200414 ' self._db._cursor.execute(sql) emptyRecord = list(self._db._cursor.fetchall()) empty = [] @@ -139,9 +138,9 @@ def test_signal_latest(self): def test_src_sig(self): #BASE CASES rows = [ - CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', # + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', 2, 2, 2, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', # updating previous entry + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', 3, 3, 3, nmv, nmv, nmv, 20200414, 0) ] self._db.insert_or_update_bulk(rows) @@ -235,36 +234,13 @@ def test_src_sig(self): self._db.insert_or_update_bulk(newGeoValues) self._db.run_dbjobs() - self._db._cursor.execute('''SELECT `geo_type`,`geo_value` FROM `geo_dim`''') + self._db._cursor.execute(f'SELECT `geo_type`,`geo_value` FROM `geo_dim`') record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),geoDimRows + 3) #2 + 3 new self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) - - def showTable(self, table_name): - x = PrettyTable() - x.field_names = ['signal_data_id', - 'signal_key_id', - 'geo_key_id', - 'demog_key_id' , - 'issue' , - 'data_as_of_dt', - 'time_type', - 'time_value' , - 'reference_dt' , - 'value' , - 'stderr' , - 'sample_size', - 'lag' , - 'value_updated_timestamp' , - 'computation_as_of_dt'] - print("SIGNAL_LATEST TABLE") - for row in record: - x.add_row(list(row)[:len(x.field_names)]) - print(x) - print("Finish with 1st Set of Data") @unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice") def test_diff_timevalue_issue(self): @@ -276,6 +252,6 @@ def test_diff_timevalue_issue(self): ] self._db.insert_or_update_bulk(rows) self._db.run_dbjobs() - self._db._cursor.execute('''SELECT `issue` FROM `signal_latest` ''') + self._db._cursor.execute(f'SELECT `issue` FROM {Database.latest_table} ') record = self._db._cursor.fetchall() self.assertEqual(record[0][0], 20200417) #20200416 != 20200417 \ No newline at end of file From 64ea75d7eaf48d1cce6522e6c34d5cc509be0dda Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Wed, 15 Jun 2022 16:08:00 -0500 Subject: [PATCH 18/30] reverted test_covidcast_meta_caching --- .../acquisition/covidcast/test_covidcast_meta_caching.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py index b1be8bcec..7c98093e0 100644 --- a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py +++ b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py @@ -26,7 +26,7 @@ class CovidcastMetaCacheTests(unittest.TestCase): - """Tests covidcast metadata caching. """ + """Tests covidcast metadata caching.""" def setUp(self): """Perform per-test setup.""" @@ -95,8 +95,9 @@ def test_caching(self): cvc_database = live.Database() cvc_database.connect() epidata1 = cvc_database.compute_covidcast_meta() - #tests the regular interval (runs daily, computes for each source and signal and stores into epiddata1 cvc_database.disconnect(False) + + # Testing Set self.assertEqual(len(epidata1),1) self.assertEqual(epidata1, [ { @@ -118,9 +119,6 @@ def test_caching(self): } ]) epidata1={'result':1, 'message':'success', 'epidata':epidata1} - - # make sure the API covidcast_meta is still blank, since it only serves - # the cached version and we haven't cached anything yet epidata2 = Epidata.covidcast_meta() self.assertEqual(epidata2['result'], -2, json.dumps(epidata2)) From 00e396b12fb71dcd349cc1a8bcf41951715cd896 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Fri, 17 Jun 2022 12:02:43 -0500 Subject: [PATCH 19/30] tidied up imports --- integrations/acquisition/covidcast/test_is_latest_issue.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 5eb6e0169..116814447 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -1,20 +1,14 @@ """Integration tests for covidcast's is_latest_issue boolean.""" # standard library import unittest -import time -import threading # third party -from aiohttp.client_exceptions import ClientResponseError import mysql.connector -import pytest # first party -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger from delphi_utils import Nans from delphi.epidata.client.delphi_epidata import Epidata from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow -from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache import delphi.operations.secrets as secrets # py3tester coverage target (equivalent to `import *`) From dee268d5f6be27648d36807a06142df3962be46c Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Tue, 12 Jul 2022 12:22:35 -0500 Subject: [PATCH 20/30] removed dim tests --- integrations/acquisition/covidcast/test_is_latest_issue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 116814447..bc9937e33 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -113,7 +113,8 @@ def test_signal_latest(self): #dynamic check for signal_history's list of issue self._db._cursor.execute(f'SELECT `issue` FROM {Database.history_table}') record3 = self._db._cursor.fetchall() - self.assertEqual(len(record3),totalRows + 1) #ensure 3 added (1 of which refreshed) + self.assertEqual(len(record3),totalRows + 1) + self.assertEqual(3,totalRows + 1) #ensure len(record3) = totalRows + 1 = 3 self.assertEqual(20200416,max(record3)[0]) #max of the outputs is 20200416 , extracting from tuple #check older issue not inside latest, empty field From 157530ffb10505ac7715703d81faf9e6ca6bf89a Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Tue, 12 Jul 2022 12:30:41 -0500 Subject: [PATCH 21/30] added dim tests in new file --- .../acquisition/covidcast/test_dim_tables.py | 172 ++++++++++++++++++ .../covidcast/test_is_latest_issue.py | 117 +----------- 2 files changed, 174 insertions(+), 115 deletions(-) create mode 100644 integrations/acquisition/covidcast/test_dim_tables.py diff --git a/integrations/acquisition/covidcast/test_dim_tables.py b/integrations/acquisition/covidcast/test_dim_tables.py new file mode 100644 index 000000000..11d60fd95 --- /dev/null +++ b/integrations/acquisition/covidcast/test_dim_tables.py @@ -0,0 +1,172 @@ +"""Integration tests for covidcast's dimension tables.""" +# standard library +import unittest + + +# third party +import mysql.connector +# first party +from delphi_utils import Nans +from delphi.epidata.client.delphi_epidata import Epidata +from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +import delphi.operations.secrets as secrets + +__test_target__ = 'delphi.epidata.acquisition.covidcast.database' + +nmv = Nans.NOT_MISSING.value +class CovidcastDimensionTablesTests(unittest.TestCase): + """Tests covidcast's dimension tables.""" + + def setUp(self): + """Perform per-test setup.""" + # connect to the `epidata` database + cnx = mysql.connector.connect( + user='user', + password='pass', + host='delphi_database_epidata', + database='covid') + cur = cnx.cursor() + + # clear all tables + cur.execute("truncate table signal_load") + cur.execute("truncate table signal_history") + cur.execute("truncate table signal_latest") + cur.execute("truncate table geo_dim") + cur.execute("truncate table signal_dim") + # reset the `covidcast_meta_cache` table (it should always have one row) + cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') + cnx.commit() + cur.close() + + # make connection and cursor available to the Database object + self._db = Database() + self._db._connection = cnx + self._db._cursor = cnx.cursor() + + # use the local instance of the Epidata API + Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + + # use the local instance of the epidata database + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + #Commonly used SQL commands: + self.viewSignalDim = f'SELECT `source`, `signal` FROM `signal_dim`' + self.viewGeoDim = f'SELECT `geo_type`,`geo_value` FROM `geo_dim`' + def tearDown(self): + """Perform per-test teardown.""" + self._db._cursor.close() + self._db._connection.close() + + # We want to test src_sig to make sure rows are added to *_dim only when needed + #new src, sig (ensure that it is added into *_dim) + #old src, sig (ensure that it is NOT added into *_dim) + #new geo (added) + #old geo (not added) + def test_src_sig(self): + #BASE CASES + rows = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', + 2, 2, 2, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', + 3, 3, 3, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_bulk(rows) + self._db.run_dbjobs() + self._db._cursor.execute(self.viewGeoDim) + record = self._db._cursor.fetchall() + geoDimRows = len(list(record)) + + self._db._cursor.execute(self.viewSignalDim) + record = self._db._cursor.fetchall() + sigDimRows = len(list(record)) + + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + sigLatestRows = len(list(record)) + + #test not added first + with self.subTest(name='older src and sig not added into sig_dim'): + oldSrcSig = [ + CovidcastRow('src', 'sig', 'day', 'state', 20211111, 'pa', #new src, new sig + 99, 99, 99, nmv, nmv, nmv, 20211111, 1), + CovidcastRow('src', 'sig', 'day', 'county', 20211111, '11111', #new src, new sig, same geo + 99, 99, 99, nmv, nmv, nmv, 20211111, 1) + ] + self._db.insert_or_update_bulk(oldSrcSig) + self._db.run_dbjobs() + + #testing src, sig + self._db._cursor.execute(self.viewSignalDim) + record = self._db._cursor.fetchall() + res = [('src','sig')] #output + self.assertEqual(res , list(record)) + + #ensure new entries are added in latest + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)), sigLatestRows + 2) #2 original, 2 added + + #ensure nothing in geo + self._db._cursor.execute(self.viewGeoDim) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),geoDimRows) + + with self.subTest(name='newer src and sig added in sig_dim'): + newSrcSig = [ + CovidcastRow('new_src', 'sig', 'day', 'state', 20200414, 'pa', # new_src + 2, 2, 2, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'new_sig', 'day', 'state', 20200414, 'pa', # new_sig + 2, 2, 2, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_bulk(newSrcSig) + self._db.run_dbjobs() + + #testing src, sig + self._db._cursor.execute(self.viewSignalDim) + record = self._db._cursor.fetchall() + res = [('src', 'sig'), ('new_src', 'sig'), ('src', 'new_sig')] + self.assertEqual(res , (record)) + #ensure nothing in geo + self._db._cursor.execute(self.viewGeoDim) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),geoDimRows) + + with self.subTest(name='old geo not added in geo_dim'): + repeatedGeoValues = [ #geo_type #geo_value + CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'pa', # same geo_type, geo_value + 2, 2, 2, nmv, nmv, nmv, 20200415, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200415, '11111', # same geo_type, geo_value + 3, 3, 3, nmv, nmv, nmv, 20200415, 0), + ] + self._db.insert_or_update_bulk(repeatedGeoValues) + self._db.run_dbjobs() + + self._db._cursor.execute(self.viewGeoDim) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),geoDimRows) #geoDimRows unchanged + + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),sigLatestRows + 6) #total entries = 2(initial) + 6(test) + + with self.subTest(name='newer geo added in geo_dim'): + newGeoValues = [ #geo_type #geo_value + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'nj', # everything same except, state = nj + 2, 2, 2, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15217', # everything same except, county = al + 3, 3, 3, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15451', # everything same except, county = nj + 3, 3, 3, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_bulk(newGeoValues) + self._db.run_dbjobs() + + self._db._cursor.execute(f'SELECT `geo_type`,`geo_value` FROM `geo_dim`') + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),geoDimRows + 3) #2 + 3 new + + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) + \ No newline at end of file diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index bc9937e33..c62dbe846 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -2,7 +2,6 @@ # standard library import unittest - # third party import mysql.connector # first party @@ -11,12 +10,12 @@ from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow import delphi.operations.secrets as secrets -# py3tester coverage target (equivalent to `import *`) # __test_target__ = 'delphi.epidata.acquisition.covidcast.database' nmv = Nans.NOT_MISSING.value class CovidcastLatestIssueTests(unittest.TestCase): + """Tests covidcast is_latest_issue caching.""" def setUp(self): @@ -123,119 +122,7 @@ def test_signal_latest(self): emptyRecord = list(self._db._cursor.fetchall()) empty = [] self.assertEqual(empty, emptyRecord) - - # We want to test src_sig to make sure rows are added to *_dim only when needed - #new src, sig (ensure that it is added into *_dim) - #old src, sig (ensure that it is NOT added into *_dim) - #new geo (added) - #old geo (not added) - def test_src_sig(self): - #BASE CASES - rows = [ - CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', - 2, 2, 2, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', - 3, 3, 3, nmv, nmv, nmv, 20200414, 0) - ] - self._db.insert_or_update_bulk(rows) - self._db.run_dbjobs() - self._db._cursor.execute(self.viewGeoDim) - record = self._db._cursor.fetchall() - geoDimRows = len(list(record)) - - self._db._cursor.execute(self.viewSignalDim) - record = self._db._cursor.fetchall() - sigDimRows = len(list(record)) - - self._db._cursor.execute(self.viewSignalLatest) - record = self._db._cursor.fetchall() - sigLatestRows = len(list(record)) - - #test not added first - with self.subTest(name='older src and sig not added into sig_dim'): - oldSrcSig = [ - CovidcastRow('src', 'sig', 'day', 'state', 20211111, 'pa', #new src, new sig - 99, 99, 99, nmv, nmv, nmv, 20211111, 1), - CovidcastRow('src', 'sig', 'day', 'county', 20211111, '11111', #new src, new sig, same geo - 99, 99, 99, nmv, nmv, nmv, 20211111, 1) - ] - self._db.insert_or_update_bulk(oldSrcSig) - self._db.run_dbjobs() - - #testing src, sig - self._db._cursor.execute(self.viewSignalDim) - record = self._db._cursor.fetchall() - res = [('src','sig')] #output - self.assertEqual(res , list(record)) - - #ensure new entries are added in latest - self._db._cursor.execute(self.viewSignalLatest) - record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)), sigLatestRows + 2) #2 original, 2 added - - #ensure nothing in geo - self._db._cursor.execute(self.viewGeoDim) - record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRows) - - with self.subTest(name='newer src and sig added in sig_dim'): - newSrcSig = [ - CovidcastRow('new_src', 'sig', 'day', 'state', 20200414, 'pa', # new_src - 2, 2, 2, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'new_sig', 'day', 'state', 20200414, 'pa', # new_sig - 2, 2, 2, nmv, nmv, nmv, 20200414, 0) - ] - self._db.insert_or_update_bulk(newSrcSig) - self._db.run_dbjobs() - - #testing src, sig - self._db._cursor.execute(self.viewSignalDim) - record = self._db._cursor.fetchall() - res = [('src', 'sig'), ('new_src', 'sig'), ('src', 'new_sig')] - self.assertEqual(res , (record)) - #ensure nothing in geo - self._db._cursor.execute(self.viewGeoDim) - record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRows) - - with self.subTest(name='old geo not added in geo_dim'): - repeatedGeoValues = [ #geo_type #geo_value - CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'pa', # same geo_type, geo_value - 2, 2, 2, nmv, nmv, nmv, 20200415, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200415, '11111', # same geo_type, geo_value - 3, 3, 3, nmv, nmv, nmv, 20200415, 0), - ] - self._db.insert_or_update_bulk(repeatedGeoValues) - self._db.run_dbjobs() - - self._db._cursor.execute(self.viewGeoDim) - record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRows) #geoDimRows unchanged - - self._db._cursor.execute(self.viewSignalLatest) - record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),sigLatestRows + 6) #total entries = 2(initial) + 6(test) - - with self.subTest(name='newer geo added in geo_dim'): - newGeoValues = [ #geo_type #geo_value - CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'nj', # everything same except, state = nj - 2, 2, 2, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15217', # everything same except, county = al - 3, 3, 3, nmv, nmv, nmv, 20200414, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15451', # everything same except, county = nj - 3, 3, 3, nmv, nmv, nmv, 20200414, 0) - ] - self._db.insert_or_update_bulk(newGeoValues) - self._db.run_dbjobs() - - self._db._cursor.execute(f'SELECT `geo_type`,`geo_value` FROM `geo_dim`') - record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRows + 3) #2 + 3 new - - self._db._cursor.execute(self.viewSignalLatest) - record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) - + @unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice") def test_diff_timevalue_issue(self): rows = [ From 18a27582d8056659a7afa251c78e9e5fef6c7d35 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Mon, 25 Jul 2022 14:36:30 -0500 Subject: [PATCH 22/30] used set to remove ordering of elements in return list --- integrations/acquisition/covidcast/test_dim_tables.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/integrations/acquisition/covidcast/test_dim_tables.py b/integrations/acquisition/covidcast/test_dim_tables.py index 11d60fd95..072c0307b 100644 --- a/integrations/acquisition/covidcast/test_dim_tables.py +++ b/integrations/acquisition/covidcast/test_dim_tables.py @@ -125,8 +125,16 @@ def test_src_sig(self): #testing src, sig self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() +<<<<<<< HEAD res = [('src', 'sig'), ('new_src', 'sig'), ('src', 'new_sig')] self.assertEqual(res , (record)) +======= + self.sigDimRows = len(list(record)) + + res = set([('new_src', 'sig'), ('src', 'new_sig'), ('src', 'sig')]) + self.assertEqual(res , set(record)) + self.assertEqual(3, self.sigDimRows) +>>>>>>> 8a39f36b (used set to remove ordering of elements in return list) #ensure nothing in geo self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() From cb78c73bdf9561b7e7a322da8c774ba0e64f02a6 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Tue, 26 Jul 2022 10:37:49 -0500 Subject: [PATCH 23/30] used set to remove ordering of elements in return list --- integrations/acquisition/covidcast/test_dim_tables.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/integrations/acquisition/covidcast/test_dim_tables.py b/integrations/acquisition/covidcast/test_dim_tables.py index 072c0307b..e24974d0f 100644 --- a/integrations/acquisition/covidcast/test_dim_tables.py +++ b/integrations/acquisition/covidcast/test_dim_tables.py @@ -125,16 +125,11 @@ def test_src_sig(self): #testing src, sig self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() -<<<<<<< HEAD - res = [('src', 'sig'), ('new_src', 'sig'), ('src', 'new_sig')] - self.assertEqual(res , (record)) -======= self.sigDimRows = len(list(record)) res = set([('new_src', 'sig'), ('src', 'new_sig'), ('src', 'sig')]) self.assertEqual(res , set(record)) self.assertEqual(3, self.sigDimRows) ->>>>>>> 8a39f36b (used set to remove ordering of elements in return list) #ensure nothing in geo self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() From 6682b580ba035a32a683d79aadaa7e58ce3f8e19 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Tue, 26 Jul 2022 10:57:30 -0500 Subject: [PATCH 24/30] edited dim_tables for better readability --- .../acquisition/covidcast/test_dim_tables.py | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/integrations/acquisition/covidcast/test_dim_tables.py b/integrations/acquisition/covidcast/test_dim_tables.py index e24974d0f..e4e114782 100644 --- a/integrations/acquisition/covidcast/test_dim_tables.py +++ b/integrations/acquisition/covidcast/test_dim_tables.py @@ -2,7 +2,6 @@ # standard library import unittest - # third party import mysql.connector # first party @@ -51,8 +50,11 @@ def setUp(self): secrets.db.epi = ('user', 'pass') #Commonly used SQL commands: + self.viewSignalLatest = f'SELECT * FROM {Database.latest_table}' + self.viewSignalHistory = f'SELECT * FROM {Database.history_table}' self.viewSignalDim = f'SELECT `source`, `signal` FROM `signal_dim`' self.viewGeoDim = f'SELECT `geo_type`,`geo_value` FROM `geo_dim`' + def tearDown(self): """Perform per-test teardown.""" self._db._cursor.close() @@ -71,19 +73,19 @@ def test_src_sig(self): CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', 3, 3, 3, nmv, nmv, nmv, 20200414, 0) ] - self._db.insert_or_update_bulk(rows) + self._db.insert_or_update_batch(rows) self._db.run_dbjobs() self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - geoDimRows = len(list(record)) + self.geoDimRows = len(list(record)) self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() - sigDimRows = len(list(record)) + self.sigDimRows = len(list(record)) self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - sigLatestRows = len(list(record)) + self.sigLatestRows = len(list(record)) #test not added first with self.subTest(name='older src and sig not added into sig_dim'): @@ -93,7 +95,7 @@ def test_src_sig(self): CovidcastRow('src', 'sig', 'day', 'county', 20211111, '11111', #new src, new sig, same geo 99, 99, 99, nmv, nmv, nmv, 20211111, 1) ] - self._db.insert_or_update_bulk(oldSrcSig) + self._db.insert_or_update_batch(oldSrcSig) self._db.run_dbjobs() #testing src, sig @@ -105,12 +107,13 @@ def test_src_sig(self): #ensure new entries are added in latest self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)), sigLatestRows + 2) #2 original, 2 added + self.sigLatestRows = len(list(record)) + self.assertEqual(len(list(record)), self.sigLatestRows) #2 original, 2 added (updated) #ensure nothing in geo self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRows) + self.assertEqual(len(list(record)),self.geoDimRows) with self.subTest(name='newer src and sig added in sig_dim'): newSrcSig = [ @@ -119,21 +122,20 @@ def test_src_sig(self): CovidcastRow('src', 'new_sig', 'day', 'state', 20200414, 'pa', # new_sig 2, 2, 2, nmv, nmv, nmv, 20200414, 0) ] - self._db.insert_or_update_bulk(newSrcSig) + self._db.insert_or_update_batch(newSrcSig) self._db.run_dbjobs() #testing src, sig self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() self.sigDimRows = len(list(record)) - - res = set([('new_src', 'sig'), ('src', 'new_sig'), ('src', 'sig')]) + res = set([('new_src', 'sig'), ('src', 'new_sig'), ('src', 'sig')]) # the sequence of adding changed self.assertEqual(res , set(record)) self.assertEqual(3, self.sigDimRows) #ensure nothing in geo self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRows) + self.assertEqual(len(list(record)),self.geoDimRows) with self.subTest(name='old geo not added in geo_dim'): repeatedGeoValues = [ #geo_type #geo_value @@ -142,16 +144,17 @@ def test_src_sig(self): CovidcastRow('src', 'sig', 'day', 'county', 20200415, '11111', # same geo_type, geo_value 3, 3, 3, nmv, nmv, nmv, 20200415, 0), ] - self._db.insert_or_update_bulk(repeatedGeoValues) + self._db.insert_or_update_batch(repeatedGeoValues) self._db.run_dbjobs() self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRows) #geoDimRows unchanged + self.assertEqual(len(list(record)),self.geoDimRows) #self.geoDimRows unchanged self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),sigLatestRows + 6) #total entries = 2(initial) + 6(test) + self.sigLatestRows = len(list(record)) #update + self.assertEqual(8,self.sigLatestRows) #total entries = 2(initial) + 6(test) with self.subTest(name='newer geo added in geo_dim'): newGeoValues = [ #geo_type #geo_value @@ -162,14 +165,16 @@ def test_src_sig(self): CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15451', # everything same except, county = nj 3, 3, 3, nmv, nmv, nmv, 20200414, 0) ] - self._db.insert_or_update_bulk(newGeoValues) + self._db.insert_or_update_batch(newGeoValues) self._db.run_dbjobs() self._db._cursor.execute(f'SELECT `geo_type`,`geo_value` FROM `geo_dim`') record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRows + 3) #2 + 3 new + self.geoDimRows = len(list(record)) + self.assertEqual(5,self.geoDimRows) #2 + 3 new self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test) + self.sigLatestRows = len(list(record)) #update + self.assertEqual(11,self.sigLatestRows) #total entries = 2(initial) + 6(test) + 3 \ No newline at end of file From f06c327a05a47810e8e0a49e5c16ca3406fa0cce Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Tue, 26 Jul 2022 11:07:09 -0500 Subject: [PATCH 25/30] syntax and functionality checked --- .../covidcast/test_is_latest_issue.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index c62dbe846..b2831c851 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -10,7 +10,7 @@ from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow import delphi.operations.secrets as secrets -# + __test_target__ = 'delphi.epidata.acquisition.covidcast.database' nmv = Nans.NOT_MISSING.value @@ -55,8 +55,7 @@ def setUp(self): #Commonly used SQL commands: self.viewSignalLatest = f'SELECT * FROM {Database.latest_table}' self.viewSignalHistory = f'SELECT * FROM {Database.history_table}' - self.viewSignalDim = f'SELECT `source`, `signal` FROM `signal_dim`' - self.viewGeoDim = f'SELECT `geo_type`,`geo_value` FROM `geo_dim`' + def tearDown(self): """Perform per-test teardown.""" self._db._cursor.close() @@ -68,7 +67,7 @@ def test_signal_latest(self): CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0) ] - self._db.insert_or_update_bulk(rows) + self._db.insert_or_update_batch(rows) self._db.run_dbjobs() self._db._cursor.execute(self.viewSignalHistory) totalRows = len(list(self._db._cursor.fetchall())) @@ -78,7 +77,7 @@ def test_signal_latest(self): self._db._cursor.execute(sql) record = self._db._cursor.fetchall() self.assertEqual(record[0][0], 20200414) - self.assertEqual(len(record), 1) #check 1 entry + self.assertEqual(len(record), 1) #check 1 entry present #when uploading data patches (data in signal load has < issue than data in signal_latest) #INSERT OLDER issue (does not end up in latest table) @@ -87,7 +86,7 @@ def test_signal_latest(self): newRow = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', 4.4, 4.4, 4.4, nmv, nmv, nmv, 20200416, 2)] #should show up - self._db.insert_or_update_bulk(newRow) + self._db.insert_or_update_batch(newRow) self._db.run_dbjobs() #check newer issue in signal_latest @@ -95,12 +94,12 @@ def test_signal_latest(self): self._db._cursor.execute(sql) record = self._db._cursor.fetchall() self.assertEqual(record[0][0], 20200416) #new data added - self.assertEqual(len(record), 1) #check 1 entry + self.assertEqual(len(record), 1) #check 1 entry present updateRow = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', 6.5, 2.2, 11.5, nmv, nmv, nmv, 20200414, 2)] #should not showup - self._db.insert_or_update_bulk(updateRow) + self._db.insert_or_update_batch(updateRow) self._db.run_dbjobs() #check newer issue in signal_latest @@ -112,7 +111,8 @@ def test_signal_latest(self): #dynamic check for signal_history's list of issue self._db._cursor.execute(f'SELECT `issue` FROM {Database.history_table}') record3 = self._db._cursor.fetchall() - self.assertEqual(len(record3),totalRows + 1) + totalRows = len(list(record3)) #updating totalRows + self.assertEqual(len(record3),totalRows) self.assertEqual(3,totalRows + 1) #ensure len(record3) = totalRows + 1 = 3 self.assertEqual(20200416,max(record3)[0]) #max of the outputs is 20200416 , extracting from tuple @@ -131,7 +131,7 @@ def test_diff_timevalue_issue(self): CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', # updating previous entry 2, 2, 3, nmv, nmv, nmv, 20200416, 2) ] - self._db.insert_or_update_bulk(rows) + self._db.insert_or_update_batch(rows) self._db.run_dbjobs() self._db._cursor.execute(f'SELECT `issue` FROM {Database.latest_table} ') record = self._db._cursor.fetchall() From d250f980e070679bcd5f2dfaff70fd1336475ad2 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Mon, 8 Aug 2022 01:24:06 -0400 Subject: [PATCH 26/30] file edits --- integrations/acquisition/covidcast/test_is_latest_issue.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index b2831c851..ec2939266 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -70,7 +70,7 @@ def test_signal_latest(self): self._db.insert_or_update_batch(rows) self._db.run_dbjobs() self._db._cursor.execute(self.viewSignalHistory) - totalRows = len(list(self._db._cursor.fetchall())) + self.totalRows = len(list(self._db._cursor.fetchall())) #sanity check for adding dummy data sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414' @@ -111,9 +111,8 @@ def test_signal_latest(self): #dynamic check for signal_history's list of issue self._db._cursor.execute(f'SELECT `issue` FROM {Database.history_table}') record3 = self._db._cursor.fetchall() - totalRows = len(list(record3)) #updating totalRows - self.assertEqual(len(record3),totalRows) - self.assertEqual(3,totalRows + 1) #ensure len(record3) = totalRows + 1 = 3 + self.totalRows = len(list(record3)) #updating totalRows + self.assertEqual(2,self.totalRows) #ensure len(record3) = 2 self.assertEqual(20200416,max(record3)[0]) #max of the outputs is 20200416 , extracting from tuple #check older issue not inside latest, empty field From 56852f4bd5ea4436f4ce7746f69c29975cdafeef Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Tue, 9 Aug 2022 09:55:55 -0400 Subject: [PATCH 27/30] removed unnecessary test objects --- .../acquisition/covidcast/test_dim_tables.py | 32 +++++++++---------- .../covidcast/test_is_latest_issue.py | 6 ++-- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/integrations/acquisition/covidcast/test_dim_tables.py b/integrations/acquisition/covidcast/test_dim_tables.py index e4e114782..375a06890 100644 --- a/integrations/acquisition/covidcast/test_dim_tables.py +++ b/integrations/acquisition/covidcast/test_dim_tables.py @@ -77,15 +77,15 @@ def test_src_sig(self): self._db.run_dbjobs() self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.geoDimRows = len(list(record)) + geoDimRowCount = len(list(record)) self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() - self.sigDimRows = len(list(record)) + sigDimRowCount = len(list(record)) self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.sigLatestRows = len(list(record)) + sigLatestRowCount = len(list(record)) #test not added first with self.subTest(name='older src and sig not added into sig_dim'): @@ -107,13 +107,13 @@ def test_src_sig(self): #ensure new entries are added in latest self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.sigLatestRows = len(list(record)) - self.assertEqual(len(list(record)), self.sigLatestRows) #2 original, 2 added (updated) + sigLatestRowCount = len(list(record)) + self.assertEqual(len(list(record)), sigLatestRowCount) #2 original, 2 added (updated) #ensure nothing in geo self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.geoDimRows) + self.assertEqual(len(list(record)),geoDimRowCount) with self.subTest(name='newer src and sig added in sig_dim'): newSrcSig = [ @@ -128,14 +128,14 @@ def test_src_sig(self): #testing src, sig self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() - self.sigDimRows = len(list(record)) + sigDimRowCount = len(list(record)) res = set([('new_src', 'sig'), ('src', 'new_sig'), ('src', 'sig')]) # the sequence of adding changed self.assertEqual(res , set(record)) - self.assertEqual(3, self.sigDimRows) + self.assertEqual(3, sigDimRowCount) #ensure nothing in geo self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.geoDimRows) + self.assertEqual(len(list(record)),geoDimRowCount) with self.subTest(name='old geo not added in geo_dim'): repeatedGeoValues = [ #geo_type #geo_value @@ -149,12 +149,12 @@ def test_src_sig(self): self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),self.geoDimRows) #self.geoDimRows unchanged + self.assertEqual(len(list(record)),geoDimRowCount) #geoDimRowCount unchanged self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.sigLatestRows = len(list(record)) #update - self.assertEqual(8,self.sigLatestRows) #total entries = 2(initial) + 6(test) + sigLatestRowCount = len(list(record)) #update + self.assertEqual(8,sigLatestRowCount) #total entries = 2(initial) + 6(test) with self.subTest(name='newer geo added in geo_dim'): newGeoValues = [ #geo_type #geo_value @@ -170,11 +170,11 @@ def test_src_sig(self): self._db._cursor.execute(f'SELECT `geo_type`,`geo_value` FROM `geo_dim`') record = self._db._cursor.fetchall() - self.geoDimRows = len(list(record)) - self.assertEqual(5,self.geoDimRows) #2 + 3 new + geoDimRowCount = len(list(record)) + self.assertEqual(5,geoDimRowCount) #2 + 3 new self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - self.sigLatestRows = len(list(record)) #update - self.assertEqual(11,self.sigLatestRows) #total entries = 2(initial) + 6(test) + 3 + sigLatestRowCount = len(list(record)) #update + self.assertEqual(11,sigLatestRowCount) #total entries = 2(initial) + 6(test) + 3 \ No newline at end of file diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index ec2939266..256d6826f 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -70,7 +70,7 @@ def test_signal_latest(self): self._db.insert_or_update_batch(rows) self._db.run_dbjobs() self._db._cursor.execute(self.viewSignalHistory) - self.totalRows = len(list(self._db._cursor.fetchall())) + totalRows = len(list(self._db._cursor.fetchall())) #sanity check for adding dummy data sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414' @@ -111,8 +111,8 @@ def test_signal_latest(self): #dynamic check for signal_history's list of issue self._db._cursor.execute(f'SELECT `issue` FROM {Database.history_table}') record3 = self._db._cursor.fetchall() - self.totalRows = len(list(record3)) #updating totalRows - self.assertEqual(2,self.totalRows) #ensure len(record3) = 2 + totalRows = len(list(record3)) #updating totalRows + self.assertEqual(2, totalRows) #ensure len(record3) = 2 self.assertEqual(20200416,max(record3)[0]) #max of the outputs is 20200416 , extracting from tuple #check older issue not inside latest, empty field From 7427906b6a0aa7ecb0599721349572b4d6c239ce Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Tue, 9 Aug 2022 10:00:59 -0400 Subject: [PATCH 28/30] refined line comments --- .../acquisition/covidcast/test_is_latest_issue.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 256d6826f..1de9a5614 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -77,7 +77,7 @@ def test_signal_latest(self): self._db._cursor.execute(sql) record = self._db._cursor.fetchall() self.assertEqual(record[0][0], 20200414) - self.assertEqual(len(record), 1) #check 1 entry present + self.assertEqual(len(record), 1) #placeholder data only has one issue for 20200414 #when uploading data patches (data in signal load has < issue than data in signal_latest) #INSERT OLDER issue (does not end up in latest table) @@ -85,7 +85,7 @@ def test_signal_latest(self): #when signal_load is older than signal_latest, we patch old data (i.e changed some old entries) newRow = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', - 4.4, 4.4, 4.4, nmv, nmv, nmv, 20200416, 2)] #should show up + 4.4, 4.4, 4.4, nmv, nmv, nmv, 20200416, 2)] #new row to be added self._db.insert_or_update_batch(newRow) self._db.run_dbjobs() @@ -93,8 +93,8 @@ def test_signal_latest(self): sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414 ' self._db._cursor.execute(sql) record = self._db._cursor.fetchall() - self.assertEqual(record[0][0], 20200416) #new data added - self.assertEqual(len(record), 1) #check 1 entry present + self.assertEqual(record[0][0], 20200416) #new data added, reflected in latest table + self.assertEqual(len(record), 1) # no. of record is still one, since we have latest issue with 20200416 updateRow = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', @@ -112,7 +112,7 @@ def test_signal_latest(self): self._db._cursor.execute(f'SELECT `issue` FROM {Database.history_table}') record3 = self._db._cursor.fetchall() totalRows = len(list(record3)) #updating totalRows - self.assertEqual(2, totalRows) #ensure len(record3) = 2 + self.assertEqual(2, totalRows) #added 1 new row, updated old row. Total = 2 self.assertEqual(20200416,max(record3)[0]) #max of the outputs is 20200416 , extracting from tuple #check older issue not inside latest, empty field From c12031623b269ed129d19050153deaf52af0380c Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Tue, 9 Aug 2022 10:14:55 -0400 Subject: [PATCH 29/30] refined comments and added tests for geo_type, geo_value pairs --- .../acquisition/covidcast/test_dim_tables.py | 70 +++++++++++-------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/integrations/acquisition/covidcast/test_dim_tables.py b/integrations/acquisition/covidcast/test_dim_tables.py index 375a06890..8db17e969 100644 --- a/integrations/acquisition/covidcast/test_dim_tables.py +++ b/integrations/acquisition/covidcast/test_dim_tables.py @@ -61,12 +61,12 @@ def tearDown(self): self._db._connection.close() # We want to test src_sig to make sure rows are added to *_dim only when needed - #new src, sig (ensure that it is added into *_dim) - #old src, sig (ensure that it is NOT added into *_dim) - #new geo (added) - #old geo (not added) + #new src, sig (ensure that it is added into *_dim) + #old src, sig (ensure that it is NOT added into *_dim) + #new geo (added) + #old geo (not added) def test_src_sig(self): - #BASE CASES + #BASE CASES rows = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', 2, 2, 2, nmv, nmv, nmv, 20200414, 0), @@ -87,12 +87,12 @@ def test_src_sig(self): record = self._db._cursor.fetchall() sigLatestRowCount = len(list(record)) - #test not added first + #test same src, sig not added with self.subTest(name='older src and sig not added into sig_dim'): oldSrcSig = [ - CovidcastRow('src', 'sig', 'day', 'state', 20211111, 'pa', #new src, new sig + CovidcastRow('src', 'sig', 'day', 'state', 20211111, 'pa', #same src, sig but diff timevalue and issue 99, 99, 99, nmv, nmv, nmv, 20211111, 1), - CovidcastRow('src', 'sig', 'day', 'county', 20211111, '11111', #new src, new sig, same geo + CovidcastRow('src', 'sig', 'day', 'county', 20211111, '11111', #same src, sig but diff timevalue and issue 99, 99, 99, nmv, nmv, nmv, 20211111, 1) ] self._db.insert_or_update_batch(oldSrcSig) @@ -107,14 +107,15 @@ def test_src_sig(self): #ensure new entries are added in latest self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - sigLatestRowCount = len(list(record)) - self.assertEqual(len(list(record)), sigLatestRowCount) #2 original, 2 added (updated) + sigLatestRowCount = len(list(record)) #updating sigLatestRowCount + self.assertEqual(4 , sigLatestRowCount) #diff timevalue and issue, 2 older + 2 newer = 4 - #ensure nothing in geo + #ensure nothing changed in geoDim self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRowCount) + self.assertEqual(len(list(record)),geoDimRowCount) #geoDimRowCount remains unchanged + #testing new src, sig added with self.subTest(name='newer src and sig added in sig_dim'): newSrcSig = [ CovidcastRow('new_src', 'sig', 'day', 'state', 20200414, 'pa', # new_src @@ -128,33 +129,42 @@ def test_src_sig(self): #testing src, sig self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() - sigDimRowCount = len(list(record)) - res = set([('new_src', 'sig'), ('src', 'new_sig'), ('src', 'sig')]) # the sequence of adding changed - self.assertEqual(res , set(record)) - self.assertEqual(3, sigDimRowCount) - #ensure nothing in geo + sigDimRowCount = len(list(record)) #update sigDimRowCount + res = set([('new_src', 'sig'), ('src', 'new_sig'), ('src', 'sig')]) # turn into set to ignore ordering + self.assertEqual(res , set(record)) + self.assertEqual(3, sigDimRowCount) #originally had (src , sig) added 2 new pairs + + #ensure new entries are added in latest + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + sigLatestRowCount = len(list(record)) #updating sigLatestRowCount + self.assertEqual(6 , sigLatestRowCount) #diff timevalue and issue, 2 more added ontop of 4 previously + + #ensure nothing changed in geoDim self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() self.assertEqual(len(list(record)),geoDimRowCount) + #testing repeated geo not added with self.subTest(name='old geo not added in geo_dim'): - repeatedGeoValues = [ #geo_type #geo_value - CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'pa', # same geo_type, geo_value + repeatedGeoValues = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'pa', # same geo_type, geo_value ('pa') 2, 2, 2, nmv, nmv, nmv, 20200415, 0), - CovidcastRow('src', 'sig', 'day', 'county', 20200415, '11111', # same geo_type, geo_value + CovidcastRow('src', 'sig', 'day', 'county', 20200415, '11111', # same geo_type, geo_value ('11111') 3, 3, 3, nmv, nmv, nmv, 20200415, 0), ] self._db.insert_or_update_batch(repeatedGeoValues) self._db.run_dbjobs() - self._db._cursor.execute(self.viewGeoDim) - record = self._db._cursor.fetchall() - self.assertEqual(len(list(record)),geoDimRowCount) #geoDimRowCount unchanged - self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() sigLatestRowCount = len(list(record)) #update - self.assertEqual(8,sigLatestRowCount) #total entries = 2(initial) + 6(test) + self.assertEqual(8, sigLatestRowCount) #total entries = 2 + 6 in previous subtest + + #ensure nothing changed in geoDim with repeated geo_type, geo_value pairs + self._db._cursor.execute(self.viewGeoDim) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),geoDimRowCount) #geoDimRowCount unchanged with self.subTest(name='newer geo added in geo_dim'): newGeoValues = [ #geo_type #geo_value @@ -170,11 +180,13 @@ def test_src_sig(self): self._db._cursor.execute(f'SELECT `geo_type`,`geo_value` FROM `geo_dim`') record = self._db._cursor.fetchall() - geoDimRowCount = len(list(record)) - self.assertEqual(5,geoDimRowCount) #2 + 3 new + res = set([('state', 'nj'), ('county', '15217'), ('county', '15451'), ('state', 'pa'), ('county', '11111')]) # turn into set to ignore ordering + self.assertEqual(res , set(record)) #ensure the values are the same, 3 new ones and 2 older ones + geoDimRowCount = len(list(record)) + self.assertEqual(5,geoDimRowCount) #2 + 3 new pairs self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - sigLatestRowCount = len(list(record)) #update - self.assertEqual(11,sigLatestRowCount) #total entries = 2(initial) + 6(test) + 3 + sigLatestRowCount = len(list(record)) #update sigLatestRowCount + self.assertEqual(11,sigLatestRowCount) #total entries = 8 previously + 3 new ones \ No newline at end of file From 1f6322c9135b5cf01da614cf2fba42e1331faef5 Mon Sep 17 00:00:00 2001 From: Xavier Xia Date: Tue, 9 Aug 2022 10:25:49 -0400 Subject: [PATCH 30/30] clean up redundant comments --- .../acquisition/covidcast/test_dim_tables.py | 14 ++++++++------ .../acquisition/covidcast/test_is_latest_issue.py | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/integrations/acquisition/covidcast/test_dim_tables.py b/integrations/acquisition/covidcast/test_dim_tables.py index 8db17e969..b17fc73b2 100644 --- a/integrations/acquisition/covidcast/test_dim_tables.py +++ b/integrations/acquisition/covidcast/test_dim_tables.py @@ -75,9 +75,11 @@ def test_src_sig(self): ] self._db.insert_or_update_batch(rows) self._db.run_dbjobs() + + #initializing local variables to be used throughout later self._db._cursor.execute(self.viewGeoDim) record = self._db._cursor.fetchall() - geoDimRowCount = len(list(record)) + geoDimRowCount = len(list(record)) self._db._cursor.execute(self.viewSignalDim) record = self._db._cursor.fetchall() @@ -107,8 +109,8 @@ def test_src_sig(self): #ensure new entries are added in latest self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - sigLatestRowCount = len(list(record)) #updating sigLatestRowCount - self.assertEqual(4 , sigLatestRowCount) #diff timevalue and issue, 2 older + 2 newer = 4 + sigLatestRowCount = len(list(record)) + self.assertEqual(4 , sigLatestRowCount) #added diff timevalue and issue, 2 older + 2 newer = 4 #ensure nothing changed in geoDim self._db._cursor.execute(self.viewGeoDim) @@ -137,8 +139,8 @@ def test_src_sig(self): #ensure new entries are added in latest self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - sigLatestRowCount = len(list(record)) #updating sigLatestRowCount - self.assertEqual(6 , sigLatestRowCount) #diff timevalue and issue, 2 more added ontop of 4 previously + sigLatestRowCount = len(list(record)) + self.assertEqual(6 , sigLatestRowCount) #added diff timevalue and issue, 2 more added ontop of 4 previously #ensure nothing changed in geoDim self._db._cursor.execute(self.viewGeoDim) @@ -158,7 +160,7 @@ def test_src_sig(self): self._db._cursor.execute(self.viewSignalLatest) record = self._db._cursor.fetchall() - sigLatestRowCount = len(list(record)) #update + sigLatestRowCount = len(list(record)) self.assertEqual(8, sigLatestRowCount) #total entries = 2 + 6 in previous subtest #ensure nothing changed in geoDim with repeated geo_type, geo_value pairs diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py index 1de9a5614..fa27e3a33 100644 --- a/integrations/acquisition/covidcast/test_is_latest_issue.py +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -98,7 +98,7 @@ def test_signal_latest(self): updateRow = [ CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', - 6.5, 2.2, 11.5, nmv, nmv, nmv, 20200414, 2)] #should not showup + 6.5, 2.2, 11.5, nmv, nmv, nmv, 20200414, 2)] #update previous entry self._db.insert_or_update_batch(updateRow) self._db.run_dbjobs() @@ -111,7 +111,7 @@ def test_signal_latest(self): #dynamic check for signal_history's list of issue self._db._cursor.execute(f'SELECT `issue` FROM {Database.history_table}') record3 = self._db._cursor.fetchall() - totalRows = len(list(record3)) #updating totalRows + totalRows = len(list(record3)) self.assertEqual(2, totalRows) #added 1 new row, updated old row. Total = 2 self.assertEqual(20200416,max(record3)[0]) #max of the outputs is 20200416 , extracting from tuple