diff --git a/.gitignore b/.gitignore index 7daea21f0..b46e4112b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ __pycache__/ /node_modules .mypy_cache /missing_db_signals.csv +/dev/local/output.txt \ No newline at end of file diff --git a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py index c18363c03..7c98093e0 100644 --- a/integrations/acquisition/covidcast/test_covidcast_meta_caching.py +++ b/integrations/acquisition/covidcast/test_covidcast_meta_caching.py @@ -94,8 +94,10 @@ def test_caching(self): # make sure the live utility is serving something sensible cvc_database = live.Database() cvc_database.connect() - epidata1 = cvc_database.compute_covidcast_meta() + epidata1 = cvc_database.compute_covidcast_meta() cvc_database.disconnect(False) + + # Testing Set self.assertEqual(len(epidata1),1) self.assertEqual(epidata1, [ { @@ -117,9 +119,6 @@ def test_caching(self): } ]) epidata1={'result':1, 'message':'success', 'epidata':epidata1} - - # make sure the API covidcast_meta is still blank, since it only serves - # the cached version and we haven't cached anything yet epidata2 = Epidata.covidcast_meta() self.assertEqual(epidata2['result'], -2, json.dumps(epidata2)) diff --git a/integrations/acquisition/covidcast/test_dim_tables.py b/integrations/acquisition/covidcast/test_dim_tables.py new file mode 100644 index 000000000..b17fc73b2 --- /dev/null +++ b/integrations/acquisition/covidcast/test_dim_tables.py @@ -0,0 +1,194 @@ +"""Integration tests for covidcast's dimension tables.""" +# standard library +import unittest + +# third party +import mysql.connector +# first party +from delphi_utils import Nans +from delphi.epidata.client.delphi_epidata import Epidata +from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +import delphi.operations.secrets as secrets + +__test_target__ = 'delphi.epidata.acquisition.covidcast.database' + +nmv = Nans.NOT_MISSING.value +class CovidcastDimensionTablesTests(unittest.TestCase): + """Tests covidcast's dimension tables.""" + + def setUp(self): + """Perform per-test setup.""" + # connect to the `epidata` database + cnx = mysql.connector.connect( + user='user', + password='pass', + host='delphi_database_epidata', + database='covid') + cur = cnx.cursor() + + # clear all tables + cur.execute("truncate table signal_load") + cur.execute("truncate table signal_history") + cur.execute("truncate table signal_latest") + cur.execute("truncate table geo_dim") + cur.execute("truncate table signal_dim") + # reset the `covidcast_meta_cache` table (it should always have one row) + cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') + cnx.commit() + cur.close() + + # make connection and cursor available to the Database object + self._db = Database() + self._db._connection = cnx + self._db._cursor = cnx.cursor() + + # use the local instance of the Epidata API + Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + + # use the local instance of the epidata database + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + #Commonly used SQL commands: + self.viewSignalLatest = f'SELECT * FROM {Database.latest_table}' + self.viewSignalHistory = f'SELECT * FROM {Database.history_table}' + self.viewSignalDim = f'SELECT `source`, `signal` FROM `signal_dim`' + self.viewGeoDim = f'SELECT `geo_type`,`geo_value` FROM `geo_dim`' + + def tearDown(self): + """Perform per-test teardown.""" + self._db._cursor.close() + self._db._connection.close() + + # We want to test src_sig to make sure rows are added to *_dim only when needed + #new src, sig (ensure that it is added into *_dim) + #old src, sig (ensure that it is NOT added into *_dim) + #new geo (added) + #old geo (not added) + def test_src_sig(self): + #BASE CASES + rows = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', + 2, 2, 2, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', + 3, 3, 3, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_batch(rows) + self._db.run_dbjobs() + + #initializing local variables to be used throughout later + self._db._cursor.execute(self.viewGeoDim) + record = self._db._cursor.fetchall() + geoDimRowCount = len(list(record)) + + self._db._cursor.execute(self.viewSignalDim) + record = self._db._cursor.fetchall() + sigDimRowCount = len(list(record)) + + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + sigLatestRowCount = len(list(record)) + + #test same src, sig not added + with self.subTest(name='older src and sig not added into sig_dim'): + oldSrcSig = [ + CovidcastRow('src', 'sig', 'day', 'state', 20211111, 'pa', #same src, sig but diff timevalue and issue + 99, 99, 99, nmv, nmv, nmv, 20211111, 1), + CovidcastRow('src', 'sig', 'day', 'county', 20211111, '11111', #same src, sig but diff timevalue and issue + 99, 99, 99, nmv, nmv, nmv, 20211111, 1) + ] + self._db.insert_or_update_batch(oldSrcSig) + self._db.run_dbjobs() + + #testing src, sig + self._db._cursor.execute(self.viewSignalDim) + record = self._db._cursor.fetchall() + res = [('src','sig')] #output + self.assertEqual(res , list(record)) + + #ensure new entries are added in latest + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + sigLatestRowCount = len(list(record)) + self.assertEqual(4 , sigLatestRowCount) #added diff timevalue and issue, 2 older + 2 newer = 4 + + #ensure nothing changed in geoDim + self._db._cursor.execute(self.viewGeoDim) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),geoDimRowCount) #geoDimRowCount remains unchanged + + #testing new src, sig added + with self.subTest(name='newer src and sig added in sig_dim'): + newSrcSig = [ + CovidcastRow('new_src', 'sig', 'day', 'state', 20200414, 'pa', # new_src + 2, 2, 2, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'new_sig', 'day', 'state', 20200414, 'pa', # new_sig + 2, 2, 2, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_batch(newSrcSig) + self._db.run_dbjobs() + + #testing src, sig + self._db._cursor.execute(self.viewSignalDim) + record = self._db._cursor.fetchall() + sigDimRowCount = len(list(record)) #update sigDimRowCount + res = set([('new_src', 'sig'), ('src', 'new_sig'), ('src', 'sig')]) # turn into set to ignore ordering + self.assertEqual(res , set(record)) + self.assertEqual(3, sigDimRowCount) #originally had (src , sig) added 2 new pairs + + #ensure new entries are added in latest + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + sigLatestRowCount = len(list(record)) + self.assertEqual(6 , sigLatestRowCount) #added diff timevalue and issue, 2 more added ontop of 4 previously + + #ensure nothing changed in geoDim + self._db._cursor.execute(self.viewGeoDim) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),geoDimRowCount) + + #testing repeated geo not added + with self.subTest(name='old geo not added in geo_dim'): + repeatedGeoValues = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'pa', # same geo_type, geo_value ('pa') + 2, 2, 2, nmv, nmv, nmv, 20200415, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200415, '11111', # same geo_type, geo_value ('11111') + 3, 3, 3, nmv, nmv, nmv, 20200415, 0), + ] + self._db.insert_or_update_batch(repeatedGeoValues) + self._db.run_dbjobs() + + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + sigLatestRowCount = len(list(record)) + self.assertEqual(8, sigLatestRowCount) #total entries = 2 + 6 in previous subtest + + #ensure nothing changed in geoDim with repeated geo_type, geo_value pairs + self._db._cursor.execute(self.viewGeoDim) + record = self._db._cursor.fetchall() + self.assertEqual(len(list(record)),geoDimRowCount) #geoDimRowCount unchanged + + with self.subTest(name='newer geo added in geo_dim'): + newGeoValues = [ #geo_type #geo_value + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'nj', # everything same except, state = nj + 2, 2, 2, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15217', # everything same except, county = al + 3, 3, 3, nmv, nmv, nmv, 20200414, 0), + CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15451', # everything same except, county = nj + 3, 3, 3, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_batch(newGeoValues) + self._db.run_dbjobs() + + self._db._cursor.execute(f'SELECT `geo_type`,`geo_value` FROM `geo_dim`') + record = self._db._cursor.fetchall() + res = set([('state', 'nj'), ('county', '15217'), ('county', '15451'), ('state', 'pa'), ('county', '11111')]) # turn into set to ignore ordering + self.assertEqual(res , set(record)) #ensure the values are the same, 3 new ones and 2 older ones + geoDimRowCount = len(list(record)) + self.assertEqual(5,geoDimRowCount) #2 + 3 new pairs + + self._db._cursor.execute(self.viewSignalLatest) + record = self._db._cursor.fetchall() + sigLatestRowCount = len(list(record)) #update sigLatestRowCount + self.assertEqual(11,sigLatestRowCount) #total entries = 8 previously + 3 new ones + \ No newline at end of file diff --git a/integrations/acquisition/covidcast/test_is_latest_issue.py b/integrations/acquisition/covidcast/test_is_latest_issue.py new file mode 100644 index 000000000..fa27e3a33 --- /dev/null +++ b/integrations/acquisition/covidcast/test_is_latest_issue.py @@ -0,0 +1,137 @@ +"""Integration tests for covidcast's is_latest_issue boolean.""" +# standard library +import unittest + +# third party +import mysql.connector +# first party +from delphi_utils import Nans +from delphi.epidata.client.delphi_epidata import Epidata +from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow +import delphi.operations.secrets as secrets + + +__test_target__ = 'delphi.epidata.acquisition.covidcast.database' + +nmv = Nans.NOT_MISSING.value +class CovidcastLatestIssueTests(unittest.TestCase): + + """Tests covidcast is_latest_issue caching.""" + + def setUp(self): + """Perform per-test setup.""" + + # connect to the `epidata` database + cnx = mysql.connector.connect( + user='user', + password='pass', + host='delphi_database_epidata', + database='covid') + cur = cnx.cursor() + + # clear all tables + cur.execute("truncate table signal_load") + cur.execute("truncate table signal_history") + cur.execute("truncate table signal_latest") + cur.execute("truncate table geo_dim") + cur.execute("truncate table signal_dim") + # reset the `covidcast_meta_cache` table (it should always have one row) + cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') + cnx.commit() + cur.close() + + # make connection and cursor available to the Database object + self._db = Database() + self._db._connection = cnx + self._db._cursor = cnx.cursor() + + # use the local instance of the Epidata API + Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php' + + # use the local instance of the epidata database + secrets.db.host = 'delphi_database_epidata' + secrets.db.epi = ('user', 'pass') + + #Commonly used SQL commands: + self.viewSignalLatest = f'SELECT * FROM {Database.latest_table}' + self.viewSignalHistory = f'SELECT * FROM {Database.history_table}' + + def tearDown(self): + """Perform per-test teardown.""" + self._db._cursor.close() + self._db._connection.close() + + def test_signal_latest(self): + + rows = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', + 1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0) + ] + self._db.insert_or_update_batch(rows) + self._db.run_dbjobs() + self._db._cursor.execute(self.viewSignalHistory) + totalRows = len(list(self._db._cursor.fetchall())) + + #sanity check for adding dummy data + sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414' + self._db._cursor.execute(sql) + record = self._db._cursor.fetchall() + self.assertEqual(record[0][0], 20200414) + self.assertEqual(len(record), 1) #placeholder data only has one issue for 20200414 + + #when uploading data patches (data in signal load has < issue than data in signal_latest) + #INSERT OLDER issue (does not end up in latest table) + #INSERT NEWER issue (ends up in latest table) + #when signal_load is older than signal_latest, we patch old data (i.e changed some old entries) + newRow = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', + 4.4, 4.4, 4.4, nmv, nmv, nmv, 20200416, 2)] #new row to be added + self._db.insert_or_update_batch(newRow) + self._db.run_dbjobs() + + #check newer issue in signal_latest + sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414 ' + self._db._cursor.execute(sql) + record = self._db._cursor.fetchall() + self.assertEqual(record[0][0], 20200416) #new data added, reflected in latest table + self.assertEqual(len(record), 1) # no. of record is still one, since we have latest issue with 20200416 + + updateRow = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', + 6.5, 2.2, 11.5, nmv, nmv, nmv, 20200414, 2)] #update previous entry + self._db.insert_or_update_batch(updateRow) + self._db.run_dbjobs() + + #check newer issue in signal_latest + sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414 ' + self._db._cursor.execute(sql) + record2 = self._db._cursor.fetchall() + self.assertEqual(record, record2) #same as previous as is_latest did not change + + #dynamic check for signal_history's list of issue + self._db._cursor.execute(f'SELECT `issue` FROM {Database.history_table}') + record3 = self._db._cursor.fetchall() + totalRows = len(list(record3)) + self.assertEqual(2, totalRows) #added 1 new row, updated old row. Total = 2 + self.assertEqual(20200416,max(record3)[0]) #max of the outputs is 20200416 , extracting from tuple + + #check older issue not inside latest, empty field + sql = f'SELECT * FROM {Database.latest_table} where `time_value` = 20200414 and `issue` = 20200414 ' + self._db._cursor.execute(sql) + emptyRecord = list(self._db._cursor.fetchall()) + empty = [] + self.assertEqual(empty, emptyRecord) + + @unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice") + def test_diff_timevalue_issue(self): + rows = [ + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', #updating old issue, should be seen in latest + 17, 17, 17, nmv, nmv, nmv, 20200417, 3), + CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', # updating previous entry + 2, 2, 3, nmv, nmv, nmv, 20200416, 2) + ] + self._db.insert_or_update_batch(rows) + self._db.run_dbjobs() + self._db._cursor.execute(f'SELECT `issue` FROM {Database.latest_table} ') + record = self._db._cursor.fetchall() + self.assertEqual(record[0][0], 20200417) #20200416 != 20200417 \ No newline at end of file