11"""
22Generate EMR-hosp sensors.
3-
43Author: Maria Jahja
54Created: 2020-06-01
65"""
7-
86# standard packages
97import logging
108from datetime import timedelta
119from multiprocessing import Pool , cpu_count
10+ import covidcast
11+ from delphi_utils import read_params
1212
1313# third party
1414import numpy as np
1515import pandas as pd
16-
1716# first party
1817from .config import Config , Constants
1918from .geo_maps import GeoMaps
2019from .load_data import load_combined_data
2120from .sensor import EMRHospSensor
2221from .weekday import Weekday
22+ from .constants import SIGNALS , SMOOTHED , SMOOTHED_ADJ , HRR , NA , FIPS
2323
2424from delphi_utils import GeoMapper
2525
26-
2726def write_to_csv (output_dict , write_se , out_name , output_path = "." ):
2827 """Write sensor values to csv.
29-
3028 Args:
3129 output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
3230 write_se: boolean to write out standard errors, if true, use an obfuscated name
3331 out_name: name of the output file
3432 output_path: outfile path to write the csv (default is current directory)
3533 """
36-
3734 if write_se :
3835 logging .info (f"========= WARNING: WRITING SEs TO { out_name } =========" )
39-
4036 geo_level = output_dict ["geo_level" ]
4137 dates = output_dict ["dates" ]
4238 geo_ids = output_dict ["geo_ids" ]
4339 all_rates = output_dict ["rates" ]
4440 all_se = output_dict ["se" ]
4541 all_include = output_dict ["include" ]
46-
4742 out_n = 0
4843 for i , d in enumerate (dates ):
4944 filename = "%s/%s_%s_%s.csv" % (
@@ -52,33 +47,83 @@ def write_to_csv(output_dict, write_se, out_name, output_path="."):
5247 geo_level ,
5348 out_name ,
5449 )
55-
5650 with open (filename , "w" ) as outfile :
5751 outfile .write ("geo_id,val,se,direction,sample_size\n " )
58-
5952 for geo_id in geo_ids :
6053 sensor = all_rates [geo_id ][i ]
6154 se = all_se [geo_id ][i ]
62-
6355 if all_include [geo_id ][i ]:
6456 assert not np .isnan (sensor ), "value for included sensor is nan"
6557 assert not np .isnan (se ), "se for included sensor is nan"
6658 if sensor > 90 :
6759 logging .warning (f"value suspiciously high, { geo_id } : { sensor } " )
6860 assert se < 5 , f"se suspiciously high, { geo_id } : { se } "
69-
7061 if write_se :
7162 assert sensor > 0 and se > 0 , "p=0, std_err=0 invalid"
7263 outfile .write (
73- "%s,%f,%s,%s,%s\n " % (geo_id , sensor , se , "NA" , "NA" ))
64+ "%s,%f,%s,%s,%s\n " % (geo_id , sensor , se , NA , NA ))
7465 else :
7566 # for privacy reasons we will not report the standard error
7667 outfile .write (
77- "%s,%f,%s,%s,%s\n " % (geo_id , sensor , "NA" , "NA" , "NA" )
68+ "%s,%f,%s,%s,%s\n " % (geo_id , sensor , NA , NA , NA )
7869 )
7970 out_n += 1
8071 logging .debug (f"wrote { out_n } rows for { len (geo_ids )} { geo_level } " )
8172
73+
74+ def add_prefix (signal_names , wip_signal , prefix = "wip_" ):
75+ """Adds prefix to signal if there is a WIP signal
76+ Parameters
77+ ----------
78+ signal_names: List[str]
79+ Names of signals to be exported
80+ wip_signal : List[str] or bool
81+ a list of wip signals: [], OR
82+ all signals in the registry: True OR
83+ only signals that have never been published: False
84+ prefix : 'wip_'
85+ prefix for new/non public signals
86+ Returns
87+ -------
88+ List of signal names
89+ wip/non wip signals for further computation
90+ """
91+ if wip_signal is True :
92+ return [prefix + signal for signal in signal_names ]
93+ if isinstance (wip_signal , list ):
94+ make_wip = set (wip_signal )
95+ return [
96+ prefix + signal if signal in make_wip else signal
97+ for signal in signal_names
98+ ]
99+ if wip_signal in {False , "" }:
100+ return [
101+ signal if public_signal (signal )
102+ else prefix + signal
103+ for signal in signal_names
104+ ]
105+ raise ValueError ("Supply True | False or '' or [] | list()" )
106+
107+
108+ def public_signal (signal_ ):
109+ """Checks if the signal name is already public using COVIDcast
110+ Parameters
111+ ----------
112+ signal_ : str
113+ Name of the signal
114+ Returns
115+ -------
116+ bool
117+ True if the signal is present
118+ False if the signal is not present
119+ """
120+ epidata_df = covidcast .metadata ()
121+ for index in range (len (epidata_df )):
122+ if epidata_df ['signal' ][index ] == signal_ :
123+ return True
124+ return False
125+
126+
82127class EMRHospSensorUpdator :
83128
84129 def __init__ (self ,
@@ -88,10 +133,8 @@ def __init__(self,
88133 geo ,
89134 parallel ,
90135 weekday ,
91- se ,
92- prefix = None ):
136+ se ):
93137 """Init Sensor Updator
94-
95138 Args:
96139 startdate: first sensor date (YYYY-mm-dd)
97140 enddate: last sensor date (YYYY-mm-dd)
@@ -100,11 +143,8 @@ def __init__(self,
100143 parallel: boolean to run the sensor update in parallel
101144 weekday: boolean to adjust for weekday effects
102145 se: boolean to write out standard errors, if true, use an obfuscated name
103- prefix: string to prefix to output files (used for obfuscation in producing SEs)
104-
105146 """
106147 self .startdate , self .enddate , self .dropdate = [pd .to_datetime (t ) for t in (startdate , enddate , dropdate )]
107-
108148 # handle dates
109149 assert (self .startdate > (Config .FIRST_DATA_DATE + Config .BURN_IN_PERIOD )
110150 ), f"not enough data to produce estimates starting { self .startdate } "
@@ -114,32 +154,28 @@ def __init__(self,
114154 self .geo , self .parallel , self .weekday , self .se = geo .lower (), parallel , weekday , se
115155
116156 # output file naming
117- out_name = "smoothed_adj_covid19" if self . weekday else "smoothed_covid19"
118- if se :
119- assert prefix is not None , "supply obfuscated prefix in params"
120- out_name = prefix + "_" + out_name
121- self . output_filename = out_name
122-
157+ signals = SIGNALS . copy ()
158+ signals . remove ( SMOOTHED if self . weekday else SMOOTHED_ADJ )
159+ signal_names = add_prefix (
160+ signals ,
161+ wip_signal = read_params ()[ "wip_signal" ])
162+ self . updated_signal_names = signal_names
123163
124164 def shift_dates (self ):
125165 """shift estimates forward to account for time lag, compute burnindates, sensordates
126166 """
127-
128167 drange = lambda s , e : pd .date_range (start = s ,periods = (e - s ).days ,freq = 'D' )
129168 self .startdate = self .startdate - Config .DAY_SHIFT
130169 self .burnindate = self .startdate - Config .BURN_IN_PERIOD
131170 self .fit_dates = drange (Config .FIRST_DATA_DATE , self .dropdate )
132171 self .burn_in_dates = drange (self .burnindate , self .dropdate )
133172 self .sensor_dates = drange (self .startdate , self .enddate )
134173 return True
135-
136174 def geo_reindex (self ,data ):
137175 """Reindex based on geography, include all date, geo pairs
138-
139176 Args:
140177 data: dataframe, the output of loadcombineddata
141178 staticpath: path for the static geographic files
142-
143179 Returns:
144180 dataframe
145181 """
@@ -157,92 +193,80 @@ def geo_reindex(self,data):
157193 else :
158194 logging .error (f"{ geo } is invalid, pick one of 'county', 'state', 'msa', 'hrr'" )
159195 return False
160-
161196 self .unique_geo_ids = pd .unique (data_frame [geo ])
162197 data_frame .set_index ([geo ,'date' ],inplace = True )
163-
164198 # for each location, fill in all missing dates with 0 values
165199 multiindex = pd .MultiIndex .from_product ((self .unique_geo_ids , self .fit_dates ),
166200 names = [geo , "date" ])
167201 assert (len (multiindex ) <= (Constants .MAX_GEO [geo ] * len (self .fit_dates ))
168202 ), "more loc-date pairs than maximum number of geographies x number of dates"
169-
170203 # fill dataframe with missing dates using 0
171204 data_frame = data_frame .reindex (multiindex , fill_value = 0 )
172205 data_frame .fillna (0 , inplace = True )
173206 return data_frame
174207
175208
209+
176210 def update_sensor (self ,
177211 emr_filepath ,
178212 claims_filepath ,
179213 outpath ,
180214 staticpath ):
181215 """Generate sensor values, and write to csv format.
182-
183216 Args:
184217 emr_filepath: path to the aggregated EMR data
185218 claims_filepath: path to the aggregated claims data
186219 outpath: output path for the csv results
187220 staticpath: path for the static geographic files
188221 """
189-
190222 self .shift_dates ()
191223 final_sensor_idxs = (self .burn_in_dates >= self .startdate ) & (self .burn_in_dates <= self .enddate )
192224
193225 # load data
194226 ## JS: If the data is in fips then can we also put it into hrr?
195227 base_geo = "hrr" if self .geo == "hrr" else "fips"
228+ base_geo = HRR if self .geo == HRR else FIPS
196229 data = load_combined_data (emr_filepath , claims_filepath , self .dropdate , base_geo )
197230
198231 data .reset_index (inplace = True )
199232 data_frame = self .geo_reindex (data )
200-
201233 # handle if we need to adjust by weekday
202234 wd_params = Weekday .get_params (data_frame ) if self .weekday else None
203-
204235 # run sensor fitting code (maybe in parallel)
205236 sensor_rates = {}
206237 sensor_se = {}
207238 sensor_include = {}
208239 if not self .parallel :
209240 for geo_id , sub_data in data_frame .groupby (level = 0 ):
210241 sub_data .reset_index (level = 0 ,inplace = True )
211-
212242 if self .weekday :
213243 sub_data = Weekday .calc_adjustment (wd_params , sub_data )
214-
215244 res = EMRHospSensor .fit (sub_data , self .burnindate , geo_id )
216245 res = pd .DataFrame (res )
217246 sensor_rates [geo_id ] = np .array (res .loc [final_sensor_idxs ,"rate" ])
218247 sensor_se [geo_id ] = np .array (res .loc [final_sensor_idxs ,"se" ])
219248 sensor_include [geo_id ] = np .array (res .loc [final_sensor_idxs ,"incl" ])
220-
221249 else :
222250 n_cpu = min (10 , cpu_count ())
223251 logging .debug (f"starting pool with { n_cpu } workers" )
224-
225252 with Pool (n_cpu ) as pool :
226253 pool_results = []
227254 for geo_id , sub_data in data_frame .groupby (level = 0 ,as_index = False ):
228255 sub_data .reset_index (level = 0 , inplace = True )
229256 if self .weekday :
230257 sub_data = Weekday .calc_adjustment (wd_params , sub_data )
231-
232258 pool_results .append (
233259 pool .apply_async (
234260 EMRHospSensor .fit , args = (sub_data , self .burnindate , geo_id ,),
235261 )
236262 )
237263 pool_results = [proc .get () for proc in pool_results ]
238-
239264 for res in pool_results :
240265 geo_id = res ["geo_id" ]
241266 res = pd .DataFrame (res )
242267 sensor_rates [geo_id ] = np .array (res .loc [final_sensor_idxs , "rate" ])
243268 sensor_se [geo_id ] = np .array (res .loc [final_sensor_idxs , "se" ])
244269 sensor_include [geo_id ] = np .array (res .loc [final_sensor_idxs , "incl" ])
245-
246270 unique_geo_ids = list (sensor_rates .keys ())
247271 output_dict = {
248272 "rates" : sensor_rates ,
@@ -254,6 +278,7 @@ def update_sensor(self,
254278 }
255279
256280 # write out results
257- write_to_csv (output_dict , self .se , self .output_filename , outpath )
281+ for signal in self .updated_signal_names :
282+ write_to_csv (output_dict , self .se , signal , outpath )
258283 logging .debug (f"wrote files to { outpath } " )
259- return True
284+ return True
0 commit comments