66import unittest
77
88import pandas as pd
9- from pandas .core .frame import DataFrame
10- from sqlalchemy import text
11- import mysql .connector
9+ from sqlalchemy import create_engine
1210
1311# from flask.testing import FlaskClient
1412from delphi_utils import Nans
13+ from delphi .epidata .server .main import app
1514from delphi .epidata .server ._pandas import as_pandas
16- from delphi .epidata .server ._query import limit_query
15+ from delphi .epidata .server ._query import QueryBuilder
1716
1817# py3tester coverage target
1918__test_target__ = "delphi.epidata.server._query"
@@ -111,27 +110,26 @@ class UnitTests(unittest.TestCase):
111110
112111 def setUp (self ):
113112 """Perform per-test setup."""
113+ app .config ["TESTING" ] = True
114+ app .config ["WTF_CSRF_ENABLED" ] = False
115+ app .config ["DEBUG" ] = False
114116
115117 # connect to the `epidata` database and clear the `covidcast` table
116- cnx = mysql .connector .connect (user = "user" , password = "pass" , host = "delphi_database_epidata" , database = "epidata" )
117- cur = cnx .cursor ()
118- cur .execute ("truncate table covidcast" )
119- cur .execute ('update covidcast_meta_cache set timestamp = 0, epidata = ""' )
120- cnx .commit ()
121- cur .close ()
118+ engine = create_engine ('mysql://user:pass@delphi_database_epidata/epidata' )
119+ cnx = engine .connect ()
120+ cnx .execute ("truncate table covidcast" )
121+ cnx .execute ('update covidcast_meta_cache set timestamp = 0, epidata = ""' )
122122
123123 # make connection and cursor available to test cases
124124 self .cnx = cnx
125- self .cur = cnx .cursor ()
126125
127126 def tearDown (self ):
128127 """Perform per-test teardown."""
129- self .cur .close ()
130128 self .cnx .close ()
131129
132130 def _insert_rows (self , rows : Iterable [CovidcastRow ]):
133131 sql = ",\n " .join ((str (r ) for r in rows ))
134- self .cur .execute (
132+ self .cnx .execute (
135133 f"""
136134 INSERT INTO
137135 `covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
@@ -143,7 +141,6 @@ def _insert_rows(self, rows: Iterable[CovidcastRow]):
143141 { sql }
144142 """
145143 )
146- self .cnx .commit ()
147144 return rows
148145
149146 def _rows_to_df (self , rows : Iterable [CovidcastRow ]) -> pd .DataFrame :
@@ -160,17 +157,12 @@ def test_as_pandas(self):
160157 rows = [CovidcastRow (time_value = 20200401 + i , value = float (i )) for i in range (10 )]
161158 self ._insert_rows (rows )
162159
163- with self .subTest ("simple" ):
164- query = """select * from `covidcast`"""
165- params = {}
166- parse_dates = None
167- engine = self .cnx
168- df = pd .read_sql_query (str (query ), engine , params = params , parse_dates = parse_dates )
169- df = df .astype ({"is_latest_issue" : bool , "is_wip" : bool })
160+ with app .test_request_context ('/correlation' ):
161+ q = QueryBuilder ("covidcast" , "t" )
162+
163+ df = as_pandas (str (q ), params = {}, db_engine = self .cnx , parse_dates = None ).astype ({"is_latest_issue" : bool , "is_wip" : bool })
170164 expected_df = self ._rows_to_df (rows )
171165 pd .testing .assert_frame_equal (df , expected_df )
172- query = limit_query (query , 5 )
173- df = pd .read_sql_query (str (query ), engine , params = params , parse_dates = parse_dates )
174- df = df .astype ({"is_latest_issue" : bool , "is_wip" : bool })
166+ df = as_pandas (str (q ), params = {}, db_engine = self .cnx , parse_dates = None , limit_rows = 5 ).astype ({"is_latest_issue" : bool , "is_wip" : bool })
175167 expected_df = self ._rows_to_df (rows [:5 ])
176168 pd .testing .assert_frame_equal (df , expected_df )
0 commit comments