1111
1212PACIFIC_TIMEZONE = "US/Pacific"
1313
14+
1415def _query_sql_with_params (query_template : str , search_criteria : dict , as_df : bool ) -> pd .DataFrame :
1516 # TODO: update query_sql to accept parameterized queries and use that instead
1617 search_conditions = ""
@@ -31,6 +32,7 @@ def _query_sql_with_params(query_template: str, search_criteria: dict, as_df: bo
3132
3233 return result
3334
35+
3436def localize_timestamp_col (df : dd .DataFrame , timestamp_col : Union [str , list ]) -> dd .DataFrame :
3537 """
3638 RT vehicle timestamps are given in UTC.
@@ -66,17 +68,20 @@ def get_schedule_gtfs_dataset_key(date: str, get_df: bool = True, **kwargs) -> p
6668 project = kwargs .get ("project" , "cal-itp-data-infra" )
6769 dataset = kwargs .get ("dataset" , "mart_gtfs" )
6870
69- return query_sql (f"""
71+ return query_sql (
72+ f"""
7073 SELECT gtfs_dataset_key, feed_key FROM { project } .{ dataset } .fct_daily_feed_scheduled_service_summary
7174 WHERE service_date = '{ date } '
72- """ , as_df = get_df )
75+ """ ,
76+ as_df = get_df ,
77+ )
7378
7479
7580def filter_dim_gtfs_datasets (
7681 keep_cols : list [str ] = ["key" , "name" , "type" , "regional_feed_type" , "uri" , "base64_url" ],
7782 custom_filtering : dict = None ,
7883 get_df : bool = True ,
79- ** kwargs
84+ ** kwargs ,
8085) -> pd .DataFrame :
8186 """
8287 Filter mart_transit_database.dim_gtfs_dataset table
@@ -146,12 +151,15 @@ def get_organization_id(
146151 project = kwargs .get ("project" , "cal-itp-data-infra" )
147152 dataset = kwargs .get ("dataset" , "mart_transit_database" )
148153
149- dim_provider_gtfs_data = query_sql (f"""
154+ dim_provider_gtfs_data = query_sql (
155+ f"""
150156 SELECT DISTINCT *
151157 FROM { project } .{ dataset } .dim_provider_gtfs_data
152158 WHERE DATETIME(_valid_from, '{ PACIFIC_TIMEZONE } ') <= DATETIME('{ date } ')
153159 AND DATETIME(_valid_to, '{ PACIFIC_TIMEZONE } ') >= DATETIME('{ date } ')
154- """ , as_df = True )
160+ """ ,
161+ as_df = True ,
162+ )
155163
156164 sorting = [True for c in merge_cols ]
157165 keep_cols = ["organization_source_record_id" ]
@@ -188,16 +196,19 @@ def filter_dim_county_geography(
188196 project = kwargs .get ("project" , "cal-itp-data-infra" )
189197 dataset = kwargs .get ("dataset" , "mart_transit_database" )
190198
191- df = query_sql (f"""
192- SELECT
199+ df = query_sql (
200+ f"""
201+ SELECT
193202 bohcg.organization_name,
194203 CONCAT(LPAD(CAST(dmg.caltrans_district AS STRING), 2, '0'), ' - ', dmg.caltrans_district_name) AS caltrans_district,
195204 { ',' .join (keep_cols )}
196205 FROM { project } .{ dataset } .bridge_organizations_x_headquarters_county_geography AS bohcg
197206 INNER JOIN { project } .{ dataset } .dim_county_geography AS dmg ON dmg.key = bohcg.county_geography_key
198207 WHERE DATETIME(bohcg._valid_from, '{ PACIFIC_TIMEZONE } ') <= DATETIME('{ date } ')
199208 AND DATETIME(bohcg._valid_to, '{ PACIFIC_TIMEZONE } ') >= DATETIME('{ date } ')
200- """ , as_df = True )
209+ """ ,
210+ as_df = True ,
211+ )
201212
202213 return df [["organization_name" , "caltrans_district" ] + keep_cols ].drop_duplicates ().reset_index (drop = True )
203214
@@ -316,4 +327,3 @@ def sample_gtfs_dataset_key_to_organization_crosswalk(
316327 feeds_with_district = pd .merge (feeds_with_org_info , district , on = "organization_name" )
317328
318329 return feeds_with_district
319-
0 commit comments