diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index 314d8b567..3f9451d22 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -6,7 +6,7 @@ """ import atexit from datetime import datetime -from multiprocessing import Manager, Pool, cpu_count, current_process +from multiprocessing import cpu_count import time from typing import Dict, Any @@ -15,6 +15,7 @@ create_export_csv, get_structured_logger ) +from delphi_utils.logger import pool_and_threadedlogger from .constants import (END_FROM_TODAY_MINUS, SMOOTHED_POSITIVE, RAW_POSITIVE, @@ -60,15 +61,11 @@ def generate_and_export_for_nonparent_geo(geo_groups, res_key, smooth, device, first_date, last_date, suffix, # generate args geo_res, sensor_name, export_dir, export_start_date, export_end_date, # export args - lock, log_filename, log_exceptions): # logger args + threaded_logger): # logger args """Generate sensors, create export CSV then return stats.""" - # logger cannot be passed to child processes, so has to be recreated - with lock: - logger = get_structured_logger(__name__, log_filename, log_exceptions) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name, - pid=current_process().pid) + threaded_logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name) res_df = generate_sensor_for_nonparent_geo(geo_groups, res_key, smooth, device, first_date, last_date, suffix) dates = create_export_csv(res_df, geo_res=geo_res, @@ -81,15 +78,11 @@ def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, de first_date, last_date, suffix, # generate args geo_res, sensor_name, export_dir, export_start_date, export_end_date, # export args - lock, log_filename, log_exceptions): # logger args + threaded_logger): # logger args """Generate sensors, create export CSV then return stats.""" - # logger cannot be passed to child processes, so has to be recreated - with lock: - logger = get_structured_logger(__name__, log_filename, log_exceptions) - logger.info("Generating signal and exporting to CSV", - geo_res=geo_res, - sensor=sensor_name, - pid=current_process().pid) + threaded_logger.info("Generating signal and exporting to CSV", + geo_res=geo_res, + sensor=sensor_name) res_df = generate_sensor_for_parent_geo(geo_groups, geo_data, res_key, smooth, device, first_date, last_date, suffix) dates = create_export_csv(res_df, geo_res=geo_res, @@ -168,72 +161,65 @@ def run_module(params: Dict[str, Any]): prefix="wip_") smoothers = get_smooth_info(sensors, SMOOTHERS) n_cpu = min(8, cpu_count()) # for parallelization - with Manager() as manager: + with pool_and_threadedlogger(logger, n_cpu) as (pool, threaded_logger): # for using loggers in multiple threads - # disabled due to a Pylint bug, resolved by version bump (#1886) - lock = manager.Lock() # pylint: disable=no-member logger.info("Parallelizing sensor generation", n_cpu=n_cpu) - with Pool(n_cpu) as pool: - pool_results = [] - for geo_res in NONPARENT_GEO_RESOLUTIONS: - geo_data, res_key = geo_map(geo_res, data) - geo_groups = geo_data.groupby(res_key) - for agegroup in AGE_GROUPS: - for sensor in sensors: - if agegroup == "total": - sensor_name = sensor - else: - sensor_name = "_".join([sensor, agegroup]) - pool_results.append( - pool.apply_async( - generate_and_export_for_nonparent_geo, - args=( - # generate_sensors_for_parent_geo - geo_groups, res_key, - smoothers[sensor][1], smoothers[sensor][0], - first_date, last_date, agegroup, - # create_export_csv - geo_res, sensor_name, export_dir, - export_start_date, export_end_date, - # logger params - lock, - params["common"].get("log_filename"), - params["common"].get("log_exceptions", True) - ) + pool_results = [] + for geo_res in NONPARENT_GEO_RESOLUTIONS: + geo_data, res_key = geo_map(geo_res, data) + geo_groups = geo_data.groupby(res_key) + for agegroup in AGE_GROUPS: + for sensor in sensors: + if agegroup == "total": + sensor_name = sensor + else: + sensor_name = "_".join([sensor, agegroup]) + pool_results.append( + pool.apply_async( + generate_and_export_for_nonparent_geo, + args=( + # generate_sensors_for_parent_geo + geo_groups, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup, + # create_export_csv + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, + # logger + threaded_logger ) ) - assert geo_res == "state" # Make sure geo_groups is for state level - # County/HRR/MSA level - for geo_res in PARENT_GEO_RESOLUTIONS: - geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups - for agegroup in AGE_GROUPS: - for sensor in sensors: - if agegroup == "total": - sensor_name = sensor - else: - sensor_name = "_".join([sensor, agegroup]) - pool_results.append( - pool.apply_async( - generate_and_export_for_parent_geo, - args=( - # generate_sensors_for_parent_geo - geo_groups, geo_data, res_key, - smoothers[sensor][1], smoothers[sensor][0], - first_date, last_date, agegroup, - # create_export_csv - geo_res, sensor_name, export_dir, - export_start_date, export_end_date, - # logger params - lock, - params["common"].get("log_filename"), - params["common"].get("log_exceptions", True) - ) + ) + assert geo_res == "state" # Make sure geo_groups is for state level + # County/HRR/MSA level + for geo_res in PARENT_GEO_RESOLUTIONS: + geo_data, res_key = geo_map(geo_res, data) # using the last geo_groups + for agegroup in AGE_GROUPS: + for sensor in sensors: + if agegroup == "total": + sensor_name = sensor + else: + sensor_name = "_".join([sensor, agegroup]) + pool_results.append( + pool.apply_async( + generate_and_export_for_parent_geo, + args=( + # generate_sensors_for_parent_geo + geo_groups, geo_data, res_key, + smoothers[sensor][1], smoothers[sensor][0], + first_date, last_date, agegroup, + # create_export_csv + geo_res, sensor_name, export_dir, + export_start_date, export_end_date, + # logger + threaded_logger ) ) - pool_results = [proc.get() for proc in pool_results] - for dates in pool_results: - if len(dates) > 0: - stats.append((max(dates), len(dates))) + ) + pool_results = [proc.get() for proc in pool_results] + for dates in pool_results: + if len(dates) > 0: + stats.append((max(dates), len(dates))) # Export the cache file if the pipeline runs successfully. # Otherwise, don't update the cache file