1- # -*- coding: utf-8 -*-
21import numpy as np
32import pandas as pd
43
54from .geo import FIPS_TO_STATE
65
76# Magic number for modular arithmetic; CBG -> FIPS
87MOD = 10000000
8+ from delphi_utils import read_params
9+ from delphi_epidata import Epidata
10+
11+
12+ # Add prefix to the signal name, if needed
13+ def signal_name (signal_names , wip_signal , prefix ):
14+ if wip_signal is not None :
15+ if wip_signal and type (wip_signal ) == bool :
16+ new_signal_list = []
17+ [new_signal_list .append (prefix + signal ) if epidata_signal (signal ) else new_signal_list .append (signal ) for
18+ signal in signal_names ]
19+ return new_signal_list
20+ if type (wip_signal ) == list :
21+ for signal in wip_signal :
22+ if epidata_signal (signal ):
23+ new_list = [prefix + signal ]
24+ signal_names .remove (signal )
25+ signal_names .extend (new_list )
26+ return signal_names
27+
28+
29+ # Check if the signal name is public
30+ def epidata_signal (signal_ ):
31+ epidata_df = Epidata .covidcast_meta ()
32+ for index in range (len (epidata_df ['epidata' ])):
33+ for key in epidata_df ['epidata' ][index ]:
34+ if key == 'signal' :
35+ if epidata_df ['epidata' ][index ][key ] == signal_ :
36+ return False
37+ return True
938
10- def construct_signals (cbg_df , signal_names ):
11- '''Construct Census-block level signals.
1239
40+ def construct_signals (cbg_df , signal_names ):
41+ """Construct Census-block level signals.
1342 In its current form, we prepare the following signals in addition to those
1443 already available in raw form from Safegraph:
15-
1644 - completely_home_prop, defined as:
1745 completely_home_device_count / device_count
1846 - full_time_work_prop, defined as:
1947 full_time_work_behavior_devices / device_count
2048 - part_time_work_prop, defined as:
2149 part_time_work_behavior_devices / device_count
22-
2350 Documentation for the social distancing metrics:
2451 https://docs.safegraph.com/docs/social-distancing-metrics
25-
2652 Parameters
2753 ----------
2854 cbg_df: pd.DataFrame
2955 Census block group-level dataframe with raw social distancing
3056 indicators from Safegraph.
3157 signal_names: List[str]
3258 Names of signals to be exported.
33-
3459 Returns
3560 -------
3661 pd.DataFrame
3762 Dataframe with columns: timestamp, county_fips, and
3863 {each signal described above}.
39- '''
64+ """
65+
66+ COMPLETELY_HOME = signal_names [1 ]
67+ FULL_TIME_WORK = signal_names [2 ]
68+ PART_TIME_WORK = signal_names [3 ]
69+
4070 # Preparation
4171 cbg_df ['timestamp' ] = cbg_df ['date_range_start' ].apply (
42- lambda x : str (x ).split ('T' )[0 ])
72+ lambda x : str (x ).split ('T' )[0 ])
4373 cbg_df ['county_fips' ] = (cbg_df ['origin_census_block_group' ] // MOD ).apply (
44- lambda x : f'{ int (x ):05d} ' )
74+ lambda x : f'{ int (x ):05d} ' )
4575 # Transformation: create signal not available in raw data
46- cbg_df ['completely_home_prop' ] = (cbg_df ['completely_home_device_count' ]
47- / cbg_df ['device_count' ])
48- cbg_df ['full_time_work_prop' ] = (cbg_df ['full_time_work_behavior_devices' ]
49- / cbg_df ['device_count' ])
50- cbg_df ['part_time_work_prop' ] = (cbg_df ['part_time_work_behavior_devices' ]
51- / cbg_df ['device_count' ])
76+ cbg_df [COMPLETELY_HOME ] = (cbg_df ['completely_home_device_count' ]
77+ / cbg_df ['device_count' ])
78+ cbg_df [FULL_TIME_WORK ] = (cbg_df ['full_time_work_behavior_devices' ]
79+ / cbg_df ['device_count' ])
80+ cbg_df [PART_TIME_WORK ] = (cbg_df ['part_time_work_behavior_devices' ]
81+ / cbg_df ['device_count' ])
82+
5283 # Subsetting
5384 return cbg_df [['timestamp' , 'county_fips' ] + signal_names ]
5485
86+
5587def aggregate (df , signal_names , geo_resolution = 'county' ):
5688 '''Aggregate signals to appropriate resolution and produce standard errors.
57-
5889 Parameters
5990 ----------
6091 df: pd.DataFrame
@@ -64,7 +95,6 @@ def aggregate(df, signal_names, geo_resolution='county'):
6495 Names of signals to be exported.
6596 geo_resolution: str
6697 One of ('county', 'state')
67-
6898 Returns
6999 -------
70100 pd.DataFrame:
@@ -77,34 +107,34 @@ def aggregate(df, signal_names, geo_resolution='county'):
77107 df ['geo_id' ] = df ['county_fips' ]
78108 elif geo_resolution == 'state' :
79109 df ['geo_id' ] = df ['county_fips' ].apply (lambda x :
80- FIPS_TO_STATE [x [:2 ]])
110+ FIPS_TO_STATE [x [:2 ]])
81111 else :
82112 raise ValueError (f'`geo_resolution` must be one of { GEO_RESOLUTION } .' )
83113
84114 # Aggregation and signal creation
85115 df_mean = df .groupby (['geo_id' , 'timestamp' ])[
86- signal_names
87- ].mean ()
116+ signal_names
117+ ].mean ()
88118 df_sd = df .groupby (['geo_id' , 'timestamp' ])[
89- signal_names
90- ].std ()
119+ signal_names
120+ ].std ()
91121 df_n = df .groupby (['geo_id' , 'timestamp' ])[
92- signal_names
93- ].count ()
122+ signal_names
123+ ].count ()
94124 agg_df = pd .DataFrame .join (df_mean , df_sd ,
95- lsuffix = '_mean' , rsuffix = '_sd' )
125+ lsuffix = '_mean' , rsuffix = '_sd' )
96126 agg_df = pd .DataFrame .join (agg_df , df_n .rename ({
97- signal : signal + '_n' for signal in signal_names
98- }, axis = 1 ))
127+ signal : signal + '_n' for signal in signal_names
128+ }, axis = 1 ))
99129 for signal in signal_names :
100130 agg_df [f'{ signal } _se' ] = (agg_df [f'{ signal } _sd' ]
101- / np .sqrt (agg_df [f'{ signal } _n' ]))
131+ / np .sqrt (agg_df [f'{ signal } _n' ]))
102132 return agg_df .reset_index ()
103133
104- def process (fname , signals , geo_resolutions , export_dir ):
134+
135+ def process (fname , signal_names , geo_resolutions , export_dir ):
105136 '''Process an input census block group-level CSV and export it. Assumes
106137 that the input file has _only_ one date of data.
107-
108138 Parameters
109139 ----------
110140 fname: str
@@ -113,32 +143,27 @@ def process(fname, signals, geo_resolutions, export_dir):
113143 List of (signal_name, wip).
114144 geo_resolutions: List[str]
115145 List of geo resolutions to export the data.
116-
117146 Returns
118147 -------
119148 None
120149 '''
121- signal_names , wip = (list (x ) for x in zip (* signals ))
122150 cbg_df = construct_signals (pd .read_csv (fname ), signal_names )
123151 unique_date = cbg_df ['timestamp' ].unique ()
124152 if len (unique_date ) != 1 :
125153 raise ValueError (f'More than one timestamp found in input file { fname } .' )
126154 date = unique_date [0 ].replace ('-' , '' )
127155 for geo_res in geo_resolutions :
128156 df = aggregate (cbg_df , signal_names , geo_res )
129- for signal , wip in signals :
157+ for signal in signal_names :
130158 df_export = df [
131- ['geo_id' ]
132- + [f'{ signal } _{ x } ' for x in ('mean' , 'se' , 'n' )]
133- ].rename ({
134- f'{ signal } _mean' : 'val' ,
135- f'{ signal } _se' : 'se' ,
136- f'{ signal } _n' : 'sample_size' ,
137- }, axis = 1 )
138- if wip :
139- signal = 'wip_' + signal
159+ ['geo_id' ]
160+ + [f'{ signal } _{ x } ' for x in ('mean' , 'se' , 'n' )]
161+ ].rename ({
162+ f'{ signal } _mean' : 'val' ,
163+ f'{ signal } _se' : 'se' ,
164+ f'{ signal } _n' : 'sample_size' ,
165+ }, axis = 1 )
140166 df_export .to_csv (f'{ export_dir } /{ date } _{ geo_res } _{ signal } .csv' ,
141- na_rep = 'NA' ,
142- index = False ,)
167+ na_rep = 'NA' ,
168+ index = False , )
143169 return
144-
0 commit comments