1- import covidcast
1+ """Internal functions for creating Safegraph indicator."""
2+ import datetime
3+ import os
4+ from typing import List
25import numpy as np
36import pandas as pd
7+ import covidcast
48
59from delphi_utils import GeoMapper
610
7- from .constants import HOME_DWELL , COMPLETELY_HOME , FULL_TIME_WORK , PART_TIME_WORK
11+ from .constants import HOME_DWELL , COMPLETELY_HOME , FULL_TIME_WORK , PART_TIME_WORK , GEO_RESOLUTIONS
812
913# Magic number for modular arithmetic; CBG -> FIPS
1014MOD = 10000000
1115
16+ # Base file name for raw data CSVs.
17+ CSV_NAME = 'social-distancing.csv.gz'
18+
19+ def validate (df ):
20+ """Confirms that a data frame has only one date."""
21+ timestamps = df ['date_range_start' ].apply (date_from_timestamp )
22+ assert len (timestamps .unique ()) == 1
23+
24+
25+ def date_from_timestamp (timestamp ) -> datetime .date :
26+ """Extracts the date from a timestamp beginning with {YYYY}-{MM}-{DD}T."""
27+ return datetime .date .fromisoformat (timestamp .split ('T' )[0 ])
28+
29+
30+ def files_in_past_week (current_filename ) -> List [str ]:
31+ """Constructs file paths from previous 6 days.
32+ Parameters
33+ ----------
34+ current_filename: str
35+ name of CSV file. Must be of the form
36+ {path}/{YYYY}/{MM}/{DD}/{YYYY}-{MM}-{DD}-{CSV_NAME}
37+ Returns
38+ -------
39+ List of file names corresponding to the 6 days prior to YYYY-MM-DD.
40+ """
41+ path , year , month , day , _ = current_filename .rsplit ('/' , 4 )
42+ current_date = datetime .date (int (year ), int (month ), int (day ))
43+ one_day = datetime .timedelta (days = 1 )
44+ for _ in range (1 , 7 ):
45+ current_date = current_date - one_day
46+ date_str = current_date .isoformat ()
47+ date_path = date_str .replace ('-' , '/' )
48+ new_filename = f'{ path } /{ date_path } /{ date_str } -{ CSV_NAME } '
49+ yield new_filename
50+
51+
52+ def add_suffix (signals , suffix ):
53+ """Adds `suffix` to every element of `signals`."""
54+ return [s + suffix for s in signals ]
55+
56+
1257def add_prefix (signal_names , wip_signal , prefix : str ):
1358 """Adds prefix to signal if there is a WIP signal
1459 Parameters
@@ -43,7 +88,7 @@ def add_prefix(signal_names, wip_signal, prefix: str):
4388 ]
4489 raise ValueError ("Supply True | False or '' or [] | list()" )
4590
46- # Check if the signal name is public
91+
4792def public_signal (signal_ ):
4893 """Checks if the signal name is already public using COVIDcast
4994 Parameters
@@ -90,32 +135,29 @@ def construct_signals(cbg_df, signal_names):
90135 """
91136
92137 # Preparation
93- cbg_df ['timestamp' ] = cbg_df ['date_range_start' ].apply (
94- lambda x : str (x ).split ('T' )[0 ])
95138 cbg_df ['county_fips' ] = (cbg_df ['origin_census_block_group' ] // MOD ).apply (
96139 lambda x : f'{ int (x ):05d} ' )
97140
98141 # Transformation: create signal not available in raw data
99142 for signal in signal_names :
100- if signal . endswith ( FULL_TIME_WORK ) :
143+ if FULL_TIME_WORK in signal :
101144 cbg_df [signal ] = (cbg_df ['full_time_work_behavior_devices' ]
102145 / cbg_df ['device_count' ])
103- elif signal . endswith ( COMPLETELY_HOME ) :
146+ elif COMPLETELY_HOME in signal :
104147 cbg_df [signal ] = (cbg_df ['completely_home_device_count' ]
105148 / cbg_df ['device_count' ])
106- elif signal . endswith ( PART_TIME_WORK ) :
149+ elif PART_TIME_WORK in signal :
107150 cbg_df [signal ] = (cbg_df ['part_time_work_behavior_devices' ]
108151 / cbg_df ['device_count' ])
109- elif signal . endswith ( HOME_DWELL ) :
152+ elif HOME_DWELL in signal :
110153 cbg_df [signal ] = (cbg_df ['median_home_dwell_time' ])
111154
112-
113155 # Subsetting
114- return cbg_df [['timestamp' , ' county_fips' ] + signal_names ]
156+ return cbg_df [['county_fips' ] + signal_names ]
115157
116158
117159def aggregate (df , signal_names , geo_resolution = 'county' ):
118- ''' Aggregate signals to appropriate resolution and produce standard errors.
160+ """ Aggregate signals to appropriate resolution and produce standard errors.
119161 Parameters
120162 ----------
121163 df: pd.DataFrame
@@ -130,9 +172,8 @@ def aggregate(df, signal_names, geo_resolution='county'):
130172 pd.DataFrame:
131173 DataFrame with one row per geo_id, with columns for the individual
132174 signals, standard errors, and sample sizes.
133- '''
175+ """
134176 # Prepare geo resolution
135- GEO_RESOLUTION = ('county' , 'state' )
136177 if geo_resolution == 'county' :
137178 df ['geo_id' ] = df ['county_fips' ]
138179 elif geo_resolution == 'state' :
@@ -144,18 +185,14 @@ def aggregate(df, signal_names, geo_resolution='county'):
144185 new_col = 'geo_id' ,
145186 dropna = False )
146187 else :
147- raise ValueError (f'`geo_resolution` must be one of { GEO_RESOLUTION } .' )
188+ raise ValueError (
189+ f'`geo_resolution` must be one of { GEO_RESOLUTIONS } .' )
148190
149191 # Aggregation and signal creation
150- df_mean = df .groupby (['geo_id' , 'timestamp' ])[
151- signal_names
152- ].mean ()
153- df_sd = df .groupby (['geo_id' , 'timestamp' ])[
154- signal_names
155- ].std ()
156- df_n = df .groupby (['geo_id' , 'timestamp' ])[
157- signal_names
158- ].count ()
192+ grouped_df = df .groupby (['geo_id' ])[signal_names ]
193+ df_mean = grouped_df .mean ()
194+ df_sd = grouped_df .std ()
195+ df_n = grouped_df .count ()
159196 agg_df = pd .DataFrame .join (df_mean , df_sd ,
160197 lsuffix = '_mean' , rsuffix = '_sd' )
161198 agg_df = pd .DataFrame .join (agg_df , df_n .rename ({
@@ -167,39 +204,96 @@ def aggregate(df, signal_names, geo_resolution='county'):
167204 return agg_df .reset_index ()
168205
169206
170- def process (fname , signal_names , geo_resolutions , export_dir ):
171- '''Process an input census block group-level CSV and export it. Assumes
172- that the input file has _only_ one date of data.
207+ def process_window (df_list : List [pd .DataFrame ],
208+ signal_names : List [str ],
209+ geo_resolutions : List [str ],
210+ export_dir : str ):
211+ """Processes a list of input census block group-level data frames as a
212+ single data set and exports it. Assumes each data frame has _only_ one
213+ date of data.
173214 Parameters
174215 ----------
175- export_dir
176- path where the output files are saved
177- signal_names : List[str]
216+ cbg_df: pd.DataFrame
217+ list of census block group-level frames.
218+ signal_names: List[str]
178219 signal names to be processed
179- fname: str
180- Input filename.
181220 geo_resolutions: List[str]
182221 List of geo resolutions to export the data.
222+ export_dir
223+ path where the output files are saved
183224 Returns
184225 -------
185- None
186- '''
187- cbg_df = construct_signals ( pd . read_csv ( fname ), signal_names )
188- unique_date = cbg_df [ 'timestamp' ]. unique ()
189- if len ( unique_date ) != 1 :
190- raise ValueError ( f'More than one timestamp found in input file { fname } .' )
191- date = unique_date [ 0 ]. replace ( '-' , '' )
226+ None. One file is written per (signal, resolution) pair containing the
227+ aggregated data from `df`.
228+ """
229+ for df in df_list :
230+ validate ( df )
231+ date = date_from_timestamp ( df_list [ 0 ]. at [ 0 , 'date_range_start' ] )
232+ cbg_df = pd . concat ( construct_signals ( df , signal_names ) for df in df_list )
192233 for geo_res in geo_resolutions :
193- df = aggregate (cbg_df , signal_names , geo_res )
234+ aggregated_df = aggregate (cbg_df , signal_names , geo_res )
194235 for signal in signal_names :
195- df_export = df [
236+ df_export = aggregated_df [
196237 ['geo_id' ]
197238 + [f'{ signal } _{ x } ' for x in ('mean' , 'se' , 'n' )]
198- ].rename ({
239+ ].rename ({
199240 f'{ signal } _mean' : 'val' ,
200241 f'{ signal } _se' : 'se' ,
201242 f'{ signal } _n' : 'sample_size' ,
202243 }, axis = 1 )
203244 df_export .to_csv (f'{ export_dir } /{ date } _{ geo_res } _{ signal } .csv' ,
204245 na_rep = 'NA' ,
205246 index = False , )
247+
248+
249+ def process (current_filename : str ,
250+ previous_filenames : List [str ],
251+ signal_names : List [str ],
252+ wip_signal ,
253+ geo_resolutions : List [str ],
254+ export_dir : str ):
255+ """Creates and exports signals corresponding both to a single day as well
256+ as averaged over the previous week.
257+ Parameters
258+ ----------
259+ current_filename: str
260+ path to file holding the target date's data.
261+ previous_filenames: List[str]
262+ paths to files holding data from each day in the week preceding the
263+ target date.
264+ signal_names: List[str]
265+ signal names to be processed for a single date.
266+ A second version of each such signal named {SIGNAL}_7d_avg will be
267+ created averaging {SIGNAL} over the past 7 days.
268+ wip_signal : List[str] or bool
269+ a list of wip signals: [], OR
270+ all signals in the registry: True OR
271+ only signals that have never been published: False
272+ geo_resolutions: List[str]
273+ List of geo resolutions to export the data.
274+ export_dir
275+ path where the output files are saved.
276+ Returns
277+ -------
278+ None. For each (signal, resolution) pair, one file is written for the
279+ single date values to {export_dir}/{date}_{resolution}_{signal}.csv and
280+ one for the data averaged over the previous week to
281+ {export_dir}/{date}_{resolution}_{signal}_7d_avg.csv.
282+ """
283+ past_week = [pd .read_csv (current_filename )]
284+ for fname in previous_filenames :
285+ if os .path .exists (fname ):
286+ past_week .append (pd .read_csv (fname ))
287+
288+ # First process the current file alone...
289+ process_window (past_week [:1 ],
290+ add_prefix (signal_names , wip_signal , 'wip_' ),
291+ geo_resolutions ,
292+ export_dir )
293+ # ...then as part of the whole window.
294+ process_window (past_week ,
295+ add_prefix (add_suffix (signal_names , '_7d_avg' ),
296+ wip_signal ,
297+ 'wip_' ),
298+ geo_resolutions ,
299+ export_dir )
0 commit comments