1+ """Integration tests for acquisition of rvdss data."""
2+ # standard library
3+ import unittest
4+ from unittest .mock import MagicMock
5+
6+ # first party
7+ from delphi .epidata .client .delphi_epidata import Epidata
8+ from delphi .epidata .acquisition .rvdss .database import update
9+ import delphi .operations .secrets as secrets
10+
11+ # third party
12+ import mysql .connector
13+
14+ # py3tester coverage target (equivalent to `import *`)
15+ # __test_target__ = 'delphi.epidata.acquisition.covid_hosp.facility.update'
16+
17+ NEWLINE = "\n "
18+
19+ class AcquisitionTests (unittest .TestCase ):
20+
21+ def setUp (self ):
22+ """Perform per-test setup."""
23+
24+ # configure test data
25+ # self.test_utils = UnitTestUtils(__file__)
26+
27+ # use the local instance of the Epidata API
28+ Epidata .BASE_URL = 'http://delphi_web_epidata/epidata'
29+ Epidata .auth = ('epidata' , 'key' )
30+
31+ # use the local instance of the epidata database
32+ secrets .db .host = 'delphi_database_epidata'
33+ secrets .db .epi = ('user' , 'pass' )
34+
35+ # clear relevant tables
36+ u , p = secrets .db .epi
37+ cnx = mysql .connector .connect (user = u , password = p , database = "epidata" )
38+ cur = cnx .cursor ()
39+
40+ cur .execute ('truncate table rvdss_repiratory_detections' )
41+ cur .execute ('truncate table rvdss_pct_positive' )
42+ cur .execute ('truncate table rvdss_detections_counts' )
43+ cur .execute ('delete from api_user' )
44+ cur .execute ('insert into api_user(api_key, email) values ("key", "emai")' )
45+
46+ def test_rvdss_repiratory_detections (self ):
47+ # make sure the data does not yet exist
48+ with self .subTest (name = 'no data yet' ):
49+ response = Epidata .rvdss_repiratory_detections (
50+ '450822' , Epidata .range (20200101 , 20210101 ))
51+ self .assertEqual (response ['result' ], - 2 , response )
52+
53+ # acquire sample data into local database
54+ with self .subTest (name = 'first acquisition' ):
55+ acquired = Update .run (network = mock_network )
56+ self .assertTrue (acquired )
57+
58+ # make sure the data now exists
59+ with self .subTest (name = 'initial data checks' ):
60+ expected_spotchecks = {
61+ "hospital_pk" : "450822" ,
62+ "collection_week" : 20201030 ,
63+ "publication_date" : 20210315 ,
64+ "previous_day_total_ed_visits_7_day_sum" : 536 ,
65+ "total_personnel_covid_vaccinated_doses_all_7_day_sum" : 18 ,
66+ "total_beds_7_day_avg" : 69.3 ,
67+ "previous_day_admission_influenza_confirmed_7_day_sum" : - 999999
68+ }
69+ response = Epidata .covid_hosp_facility (
70+ '450822' , Epidata .range (20200101 , 20210101 ))
71+ self .assertEqual (response ['result' ], 1 )
72+ self .assertEqual (len (response ['epidata' ]), 2 )
73+ row = response ['epidata' ][0 ]
74+ for k ,v in expected_spotchecks .items ():
75+ self .assertTrue (
76+ k in row ,
77+ f"no '{ k } ' in row:\n { NEWLINE .join (sorted (row .keys ()))} "
78+ )
79+ if isinstance (v , float ):
80+ self .assertAlmostEqual (row [k ], v , f"row[{ k } ] is { row [k ]} not { v } " )
81+ else :
82+ self .assertEqual (row [k ], v , f"row[{ k } ] is { row [k ]} not { v } " )
83+
84+ # expect 113 fields per row (114 database columns, except `id`)
85+ self .assertEqual (len (row ), 113 )
86+
87+ # re-acquisition of the same dataset should be a no-op
88+ with self .subTest (name = 'second acquisition' ):
89+ acquired = Update .run (network = mock_network )
90+ self .assertFalse (acquired )
91+
92+ # make sure the data still exists
93+ with self .subTest (name = 'final data checks' ):
94+ response = Epidata .covid_hosp_facility (
95+ '450822' , Epidata .range (20200101 , 20210101 ))
96+ self .assertEqual (response ['result' ], 1 )
97+ self .assertEqual (len (response ['epidata' ]), 2 )
98+
99+
0 commit comments