Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bd7f285
added integrations test for database for is_latest_issue
xavier-xia-99 Apr 25, 2022
d789236
syntax error
xavier-xia-99 Jun 6, 2022
c3f0205
Update integrations/acquisition/covidcast/test_is_latest_issue.py
xavier-xia-99 Jun 8, 2022
1f10a04
Update integrations/acquisition/covidcast/test_is_latest_issue.py
xavier-xia-99 Jun 8, 2022
b241980
Update requirements.txt
xavier-xia-99 Jun 8, 2022
0a4e09e
added line to ignore output.txt
xavier-xia-99 Jun 8, 2022
f0f3abe
tidied up code and provided common SQL queries as variables
xavier-xia-99 Jun 8, 2022
a998d38
removed unnecessary imports
xavier-xia-99 Jun 8, 2022
035baf7
added line to ignore output.txt
xavier-xia-99 Jun 8, 2022
6d8d25e
removed unnecessary row for signal_latest, checked there is only one …
xavier-xia-99 Jun 11, 2022
4090d4d
removed unnecessary self.
xavier-xia-99 Jun 11, 2022
2e0546f
changed (county,ca) to (county,numeric) pair
xavier-xia-99 Jun 11, 2022
39ab346
Delete output.txt
xavier-xia-99 Jun 12, 2022
db706ac
showTable function WIP
xavier-xia-99 Jun 12, 2022
b9d712f
added Epidata for BASE_URL
xavier-xia-99 Jun 15, 2022
d0b8d69
tidied up comments and test cases
xavier-xia-99 Jun 15, 2022
11a5316
changed all SQL queries to f-strings as better practice
xavier-xia-99 Jun 15, 2022
64ea75d
reverted test_covidcast_meta_caching
xavier-xia-99 Jun 15, 2022
9cd42c3
all done;
xavier-xia-99 Jun 15, 2022
00e396b
tidied up imports
xavier-xia-99 Jun 17, 2022
dee268d
removed dim tests
xavier-xia-99 Jul 12, 2022
157530f
added dim tests in new file
xavier-xia-99 Jul 12, 2022
18a2758
used set to remove ordering of elements in return list
xavier-xia-99 Jul 25, 2022
cb78c73
used set to remove ordering of elements in return list
xavier-xia-99 Jul 26, 2022
6682b58
edited dim_tables for better readability
xavier-xia-99 Jul 26, 2022
f06c327
syntax and functionality checked
xavier-xia-99 Jul 26, 2022
d250f98
file edits
xavier-xia-99 Aug 8, 2022
56852f4
removed unnecessary test objects
xavier-xia-99 Aug 9, 2022
7427906
refined line comments
xavier-xia-99 Aug 9, 2022
c120316
refined comments and added tests for geo_type, geo_value pairs
xavier-xia-99 Aug 9, 2022
1f6322c
clean up redundant comments
xavier-xia-99 Aug 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ __pycache__/
/node_modules
.mypy_cache
/missing_db_signals.csv
/dev/local/output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


class CovidcastMetaCacheTests(unittest.TestCase):
"""Tests covidcast metadata caching."""
"""Tests covidcast metadata caching. """

def setUp(self):
"""Perform per-test setup."""
Expand Down Expand Up @@ -94,7 +94,8 @@ def test_caching(self):
# make sure the live utility is serving something sensible
cvc_database = live.Database()
cvc_database.connect()
epidata1 = cvc_database.compute_covidcast_meta()
epidata1 = cvc_database.compute_covidcast_meta()
#tests the regular interval (runs daily, computes for each source and signal and stores into epiddata1
cvc_database.disconnect(False)
self.assertEqual(len(epidata1),1)
self.assertEqual(epidata1, [
Expand All @@ -117,7 +118,7 @@ def test_caching(self):
}
])
epidata1={'result':1, 'message':'success', 'epidata':epidata1}

# make sure the API covidcast_meta is still blank, since it only serves
# the cached version and we haven't cached anything yet
epidata2 = Epidata.covidcast_meta()
Expand Down
262 changes: 262 additions & 0 deletions integrations/acquisition/covidcast/test_is_latest_issue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
"""Integration tests for covidcast's is_latest_issue boolean."""
# standard library
import unittest
import time
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import time

import threading
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import threading



# third party
from aiohttp.client_exceptions import ClientResponseError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from aiohttp.client_exceptions import ClientResponseError

import mysql.connector
import pytest
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import pytest

from prettytable import PrettyTable, from_db_cursor

# first party
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger

from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main as update_covidcast_meta_cache

import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
#
__test_target__ = 'delphi.epidata.acquisition.covidcast.database'

nmv = Nans.NOT_MISSING.value
class CovidcastLatestIssueTests(unittest.TestCase):
"""Tests covidcast is_latest_issue caching."""


def setUp(self):
"""Perform per-test setup."""

# connect to the `epidata` database
cnx = mysql.connector.connect(
user='user',
password='pass',
host='delphi_database_epidata',
database='covid')
cur = cnx.cursor()

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')

cnx.commit()
cur.close()

# make connection and cursor available to the Database object
self._db = Database()
self._db._connection = cnx
self._db._cursor = cnx.cursor()

# use the local instance of the Epidata API
BASE_URL = 'http://delphi_web_epidata/epidata/api.php'

# use the local instance of the epidata database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

#Commonly used SQL commands:
self.viewSignalLatest = '''SELECT * FROM `signal_latest`'''
self.viewSignalHistory = '''SELECT * FROM `signal_history`'''
self.viewSignalDim = '''SELECT `source`, `signal` FROM `signal_dim`'''
self.viewGeoDim = '''SELECT `geo_type`,`geo_value` FROM `geo_dim`'''
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For easier maintainability, use e.g. Database.history_table instead of "signal_history" in these

def tearDown(self):
"""Perform per-test teardown."""
self._db._cursor.close()
self._db._connection.close()

def test_signal_latest(self):

rows = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa',
1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0)
]
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
#preview
self._db._cursor.execute('''SELECT * FROM `signal_history`''')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use self.viewSignalHistory?

totalRows = len(list(self._db._cursor.fetchall()))

#sanity check for adding dummy data
sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414'''
self._db._cursor.execute(sql)
record = self._db._cursor.fetchall()
self.assertEqual(record[0][0], 20200414)
self.assertEqual(len(record), 1) #check 1 entry

#when uploading data patches (data in signal load has < issue than data in signal_latest)
#INSERT OLDER issue (does not end up in latest table)
#INSERT NEWER issue (ends up in latest table)
#when signal_load is older than signal_latest, we patch old data (i.e changed some old entries)
newRow = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa',
4.4, 4.4, 4.4, nmv, nmv, nmv, 20200416, 2)] #should show up
self._db.insert_or_update_bulk(newRow)
self._db.run_dbjobs()

#check newer issue in signal_latest
sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414 '''
self._db._cursor.execute(sql)
record = self._db._cursor.fetchall()
self.assertEqual(record[0][0], 20200416) #new data added
self.assertEqual(len(record), 1) #check 1 entry

updateRow = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa',
6.5, 2.2, 11.5, nmv, nmv, nmv, 20200414, 2)] #should not showup
self._db.insert_or_update_bulk(updateRow)
self._db.run_dbjobs()

#check newer issue in signal_latest
sql = '''SELECT `issue` FROM `signal_latest` where `time_value` = 20200414 '''
self._db._cursor.execute(sql)
record2 = self._db._cursor.fetchall()
self.assertEqual(record, record2) #same as previous as is_latest did not change

#dynamic check for signal_history
self._db._cursor.execute('''SELECT `issue` FROM `signal_history`''')
record3 = self._db._cursor.fetchall()
self.assertEqual(2,totalRows + 1) #ensure 3 added (1 of which refreshed)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make more sense to check len(record3) here?

self.assertEqual(20200416,max(list(record3))[0]) #max of the outputs is 20200416 , extracting from tuple

#check older issue not inside latest, empty field
sql = '''SELECT * FROM `signal_latest` where `time_value` = 20200414 and `issue` = 20200414 '''
self._db._cursor.execute(sql)
emptyRecord = list(self._db._cursor.fetchall())
empty = []
self.assertEqual(empty, emptyRecord)

# We want to test src_sig to make sure rows are added to *_dim only when needed
#new src, sig (ensure that it is added into *_dim)
#old src, sig (ensure that it is NOT added into *_dim)
#new geo (added)
#old geo (not added)
def test_src_sig(self):
#BASE CASES
rows = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', #
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', #
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa',

2, 2, 2, nmv, nmv, nmv, 20200414, 0),
CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111', # updating previous entry
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats not an update!

3, 3, 3, nmv, nmv, nmv, 20200414, 0)
]
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
self._db._cursor.execute(self.viewGeoDim)
record = self._db._cursor.fetchall()
geoDimRows = len(list(record))

self._db._cursor.execute(self.viewSignalDim)
record = self._db._cursor.fetchall()
sigDimRows = len(list(record))

self._db._cursor.execute(self.viewSignalLatest)
record = self._db._cursor.fetchall()
sigLatestRows = len(list(record))

#test not added first
with self.subTest(name='older src and sig not added into sig_dim'):
oldSrcSig = [
CovidcastRow('src', 'sig', 'day', 'state', 20211111, 'pa', #new src, new sig
99, 99, 99, nmv, nmv, nmv, 20211111, 1),
CovidcastRow('src', 'sig', 'day', 'county', 20211111, '11111', #new src, new sig, same geo
99, 99, 99, nmv, nmv, nmv, 20211111, 1)
]
self._db.insert_or_update_bulk(oldSrcSig)
self._db.run_dbjobs()

#testing src, sig
self._db._cursor.execute(self.viewSignalDim)
record = self._db._cursor.fetchall()
res = [('src','sig')] #output
self.assertEqual(res , list(record))

#ensure new entries are added in latest
self._db._cursor.execute(self.viewSignalLatest)
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)), sigLatestRows + 2) #2 original, 2 added
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A pattern like this will be easier to maintain -- we'll be able to add new tests without having to update all subsequent expressions.

Suggested change
self.assertEqual(len(list(record)), sigLatestRows + 2) #2 original, 2 added
record_length = len(list(record))
self.assertEqual(record_length, sigLatestRows + 2) #2 original, 2 added
sigLatestRows = record_length


#ensure nothing in geo
self._db._cursor.execute(self.viewGeoDim)
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)),geoDimRows)

with self.subTest(name='newer src and sig added in sig_dim'):
newSrcSig = [
CovidcastRow('new_src', 'sig', 'day', 'state', 20200414, 'pa', # new_src
2, 2, 2, nmv, nmv, nmv, 20200414, 0),
CovidcastRow('src', 'new_sig', 'day', 'state', 20200414, 'pa', # new_sig
2, 2, 2, nmv, nmv, nmv, 20200414, 0)
]
self._db.insert_or_update_bulk(newSrcSig)
self._db.run_dbjobs()

#testing src, sig
self._db._cursor.execute(self.viewSignalDim)
record = self._db._cursor.fetchall()
res = [('src', 'sig'), ('new_src', 'sig'), ('src', 'new_sig')]
self.assertEqual(res , (record))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update sigDimRows here

#ensure nothing in geo
self._db._cursor.execute(self.viewGeoDim)
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)),geoDimRows)

with self.subTest(name='old geo not added in geo_dim'):
repeatedGeoValues = [ #geo_type #geo_value
CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'pa', # same geo_type, geo_value
2, 2, 2, nmv, nmv, nmv, 20200415, 0),
CovidcastRow('src', 'sig', 'day', 'county', 20200415, '11111', # same geo_type, geo_value
3, 3, 3, nmv, nmv, nmv, 20200415, 0),
]
self._db.insert_or_update_bulk(repeatedGeoValues)
self._db.run_dbjobs()

self._db._cursor.execute(self.viewGeoDim)
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)),geoDimRows) #geoDimRows unchanged

self._db._cursor.execute(self.viewSignalLatest)
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)),sigLatestRows + 6) #total entries = 2(initial) + 6(test)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update sigLatestRows here


with self.subTest(name='newer geo added in geo_dim'):
newGeoValues = [ #geo_type #geo_value
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'nj', # everything same except, state = nj
2, 2, 2, nmv, nmv, nmv, 20200414, 0),
CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15217', # everything same except, county = al
3, 3, 3, nmv, nmv, nmv, 20200414, 0),
CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15451', # everything same except, county = nj
3, 3, 3, nmv, nmv, nmv, 20200414, 0)
]
self._db.insert_or_update_bulk(newGeoValues)
self._db.run_dbjobs()

self._db._cursor.execute('''SELECT `geo_type`,`geo_value` FROM `geo_dim`''')
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)),geoDimRows + 3) #2 + 3 new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update geoDimRows here


self._db._cursor.execute(self.viewSignalLatest)
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)),sigLatestRows + 6 + 3) #total entries = 2(initial) + 6(test)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update sigLatestRows here



@unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can one of you clarify for me? I thought the whole point of batch issue uploads was that they included multiple issues.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I was writing tests with multiple issues but it was not performing as intended so I sought George out for some help. Would it be possible to find a time to sit down together to go through this quickly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it -- insert_or_update_bulk is called from within a loop, such that it is called separately for each CSV file. Each CSV contains only one time_value and only one issue.

for path, details in path_details:
logger.info(event='handling',dest=path)
path_src, filename = os.path.split(path)
if not details:
# file path or name was invalid, source is unknown
archive_as_failed(path_src, filename, 'unknown',logger)
continue
(source, signal, time_type, geo_type, time_value, issue, lag) = details
csv_rows = csv_importer_impl.load_csv(path, geo_type)
cc_rows = CovidcastRow.fromCsvRows(csv_rows, source, signal, time_type, geo_type, time_value, issue, lag)
rows_list = list(cc_rows)
all_rows_valid = rows_list and all(r is not None for r in rows_list)
if all_rows_valid:
try:
modified_row_count = database.insert_or_update_bulk(rows_list)

However, this test also includes run_dbjobs, which is not called as part of the csv_to_database script or as part of insert_or_update_bulk. We cannot therefore expect that run_dbjobs will only deal with one time_value and issue at a time. So it depends on what this test is trying to verify:

  • correct behavior of insert_or_update_bulk alone? --> then it should not call run_dbjobs
  • correct behavior of insert_or_update_bulk and run_dbjobs when run separately with each CSV file? --> then this test doesn't tell us whether run_dbjobs will run correctly as part of the intended load pipeline
  • correct behavior of insert_or_update_bulk when run separately with each CSV file, and of run_dbjobs when run after all available CSVs have been loaded into signal_load? --> then this test should run insert_or_update_bulk once for each time_value x issue configuration, and then run_dbjobs once at the end.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a flaw (for some definition of 'flaw') that we cannot properly process multiple issues per datapoint in a single run of insert_or_update[+dbjobs] if the issues are not provided in increasing order (or really, if the 'latest' issue is not last in the series). In practice, insert_or_update_bulk() is only called by csv_to_database:upload_archive() and each call only specifies a single issue (in fact, the only part of the composite key that differs is the geo_value). This is not going to cause us problems for the foreseeable future, but it is still worth discussing to determine if/how we want to account for possibilities later (adding/changing logic in insert_or_update (in the SQL or in the python), carrying some of those key-field arguments further down the call stack to prevent unintended consequences, refactoring the larger ingestion pipeline, or otherwise).

This comment was marked as outdated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Results of zoom discussion:
Options:

  • one CSV per insert, many CSVs per dbjobs
  • Decision: one CSV per insert+dbjobs. Have insert call dbjobs itself so csv_to_database doesn't have to bother about it.
    • if someone chooses to run dbjobs on an untrusted signal_load, it's their responsibility to ensure that issues are ordered correctly.
    • could add a guard to dbjobs to check if signal_load is sorted correctly but that check would take a long time so let's not. comment it obsessively though at the head of dbjobs.

Options for fixing at dbjobs time: none of them are good so let's not.

  • get max before loading into latest
  • sort while loading into latest

Batch issue uploads do traverse issues in the correct order, but since we're having insert call dbjobs we don't need to care.

def test_diff_timevalue_issue(self):
rows = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', #updating old issue, should be seen in latest
17, 17, 17, nmv, nmv, nmv, 20200417, 3),
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', # updating previous entry
2, 2, 3, nmv, nmv, nmv, 20200416, 2)
]
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
self._db._cursor.execute('''SELECT `issue` FROM `signal_latest` ''')
record = self._db._cursor.fetchall()
self.assertEqual(record[0][0], 20200417) #20200416 != 20200417
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.assertEqual(record[0][0], 20200417) #20200416 != 20200417
# Make sure the 4/17 issue is listed even though 4/16 was imported after it
self.assertEqual(record[0][0], 20200417)

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ scipy==1.6.2
tenacity==7.0.0
newrelic
epiweeks==2.1.2
prettytable