diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index 824cced93..624a1919b 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -48,10 +48,10 @@ def main(args, epidata_impl=Epidata, database_impl=Database): if len(metadata)==0: args = ("no results",-2) - print('covidcast_meta result: %s (code %d)' % args) + logger.info('covidcast_meta result: %s (code %d)' % args) if args[-1] != 1: - print('unable to cache epidata') + logger.error('unable to cache epidata') return False # update the cache @@ -59,7 +59,7 @@ def main(args, epidata_impl=Epidata, database_impl=Database): metadata_update_start_time = time.time() database.update_covidcast_meta_cache(metadata) metadata_update_interval_in_seconds = time.time() - metadata_update_start_time - print('successfully cached epidata') + logger.info('successfully cached epidata') finally: # no catch block so that an exception above will cause the program to # fail after the following cleanup diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 1098e54e9..55ec847cc 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -14,6 +14,7 @@ # first party from delphi_utils import Nans from delphi.utils.epiweek import delta_epiweeks +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger class CsvImporter: """Finds and parses covidcast CSV files.""" @@ -84,16 +85,17 @@ def is_sane_week(value): @staticmethod def find_issue_specific_csv_files(scan_dir, glob=glob): + logger = get_structured_logger('find_issue_specific_csv_files') for path in sorted(glob.glob(os.path.join(scan_dir, '*'))): issuedir_match = CsvImporter.PATTERN_ISSUE_DIR.match(path.lower()) if issuedir_match and os.path.isdir(path): issue_date_value = int(issuedir_match.group(2)) issue_date = CsvImporter.is_sane_day(issue_date_value) if issue_date: - print(' processing csv files from issue date: "' + str(issue_date) + '", directory', path) + logger.info('processing csv files from issue date: "' + str(issue_date) + '", directory', path) yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)), glob=glob) else: - print(' invalid issue directory day', issue_date_value) + logger.warning(event='invalid issue directory day', detail=issue_date_value, file=path) @staticmethod def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob): @@ -105,7 +107,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() valid, details is a tuple of (source, signal, time_type, geo_type, time_value, issue, lag) (otherwise None). """ - + logger = get_structured_logger('find_csv_files') issue_day,issue_epiweek=issue issue_day_value=int(issue_day.strftime("%Y%m%d")) issue_epiweek_value=int(str(issue_epiweek)) @@ -117,14 +119,11 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() if not path.lower().endswith('.csv'): # safe to ignore this file continue - - print('file:', path) - # match a daily or weekly naming pattern daily_match = CsvImporter.PATTERN_DAILY.match(path.lower()) weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower()) if not daily_match and not weekly_match: - print(' invalid csv path/filename', path) + logger.warning(event='invalid csv path/filename', detail=path, file=path) yield (path, None) continue @@ -135,7 +134,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() match = daily_match time_value_day = CsvImporter.is_sane_day(time_value) if not time_value_day: - print(' invalid filename day', time_value) + logger.warning(event='invalid filename day', detail=time_value, file=path) yield (path, None) continue issue_value=issue_day_value @@ -146,7 +145,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() match = weekly_match time_value_week=CsvImporter.is_sane_week(time_value) if not time_value_week: - print(' invalid filename week', time_value) + logger.warning(event='invalid filename week', detail=time_value, file=path) yield (path, None) continue issue_value=issue_epiweek_value @@ -155,7 +154,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() # # extract and validate geographic resolution geo_type = match.group(3).lower() if geo_type not in CsvImporter.GEOGRAPHIC_RESOLUTIONS: - print(' invalid geo_type', geo_type) + logger.warning(event='invalid geo_type', detail=geo_type, file=path) yield (path, None) continue @@ -163,7 +162,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() source = match.group(1).lower() signal = match.group(4).lower() if len(signal) > 64: - print(' invalid signal name (64 char limit)',signal) + logger.warning(event='invalid signal name (64 char limit)',detail=signal, file=path) yield (path, None) continue @@ -344,19 +343,19 @@ def load_csv(filepath, geo_type, pandas=pandas): In case of a validation error, `None` is yielded for the offending row, including the header. """ - + logger = get_structured_logger('load_csv') # don't use type inference, just get strings table = pandas.read_csv(filepath, dtype='str') if not CsvImporter.is_header_valid(table.columns): - print(' invalid header') + logger.warning(event='invalid header', detail=table.columns, file=filepath) yield None return for row in table.itertuples(index=False): row_values, error = CsvImporter.extract_and_check_row(row, geo_type) if error: - print(' invalid value for %s (%s)' % (str(row), error)) + logger.warning(event = 'invalid value for row', detail=(str(row), error), file=filepath) yield None continue yield row_values diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index cd0e5ba06..7a845134b 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -37,25 +37,25 @@ def get_argument_parser(): help="filename for log output (defaults to stdout)") return parser -def collect_files(data_dir, specific_issue_date, csv_importer_impl=CsvImporter): +def collect_files(data_dir, specific_issue_date,csv_importer_impl=CsvImporter): """Fetch path and data profile details for each file to upload.""" - + logger= get_structured_logger('collect_files') if specific_issue_date: results = list(csv_importer_impl.find_issue_specific_csv_files(data_dir)) else: results = list(csv_importer_impl.find_csv_files(os.path.join(data_dir, 'receiving'))) - print(f'found {len(results)} files') + logger.info(f'found {len(results)} files') return results def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver): if specific_issue_date: # issue-specific uploads are always one-offs, so we can leave all # files in place without worrying about cleaning up - def handle_failed(path_src, filename, source): - print(f'leaving failed file alone - {source}') + def handle_failed(path_src, filename, source, logger): + logger.info(event='leaving failed file alone', dest=source, file=filename) - def handle_successful(path_src, filename, source): - print('archiving as successful') + def handle_successful(path_src, filename, source, logger): + logger.info(event='archiving as successful',file=filename) file_archiver_impl.archive_inplace(path_src, filename) else: # normal automation runs require some shuffling to remove files @@ -64,15 +64,15 @@ def handle_successful(path_src, filename, source): archive_failed_dir = os.path.join(data_dir, 'archive', 'failed') # helper to archive a failed file without compression - def handle_failed(path_src, filename, source): - print('archiving as failed - '+source) + def handle_failed(path_src, filename, source, logger): + logger.info(event='archiving as failed - ', detail=source, file=filename) path_dst = os.path.join(archive_failed_dir, source) compress = False file_archiver_impl.archive_file(path_src, path_dst, filename, compress) # helper to archive a successful file with compression - def handle_successful(path_src, filename, source): - print('archiving as successful') + def handle_successful(path_src, filename, source, logger): + logger.info(event='archiving as successful',file=filename) path_dst = os.path.join(archive_successful_dir, source) compress = True file_archiver_impl.archive_file(path_src, path_dst, filename, compress) @@ -101,15 +101,14 @@ def upload_archive( """ archive_as_successful, archive_as_failed = handlers total_modified_row_count = 0 - # iterate over each file for path, details in path_details: - print('handling ', path) + logger.info(event='handling',dest=path) path_src, filename = os.path.split(path) if not details: # file path or name was invalid, source is unknown - archive_as_failed(path_src, filename, 'unknown') + archive_as_failed(path_src, filename, 'unknown',logger) continue (source, signal, time_type, geo_type, time_value, issue, lag) = details @@ -130,7 +129,7 @@ def upload_archive( if all_rows_valid: try: modified_row_count = database.insert_or_update_bulk(rows_list) - print(f"insert_or_update_bulk {filename} returned {modified_row_count}") + logger.info(f"insert_or_update_bulk {filename} returned {modified_row_count}") logger.info( "Inserted database rows", row_count = modified_row_count, @@ -145,14 +144,14 @@ def upload_archive( database.commit() except Exception as e: all_rows_valid = False - print('exception while inserting rows:', e) + logger.exception('exception while inserting rows:', e) database.rollback() # archive the current file based on validation results if all_rows_valid: - archive_as_successful(path_src, filename, source) + archive_as_successful(path_src, filename, source, logger) else: - archive_as_failed(path_src, filename, source) + archive_as_failed(path_src, filename, source,logger) return total_modified_row_count @@ -168,7 +167,7 @@ def main( start_time = time.time() if args.is_wip_override and args.not_wip_override: - print('conflicting overrides for forcing WIP option! exiting...') + logger.error('conflicting overrides for forcing WIP option! exiting...') return wip_override = None if args.is_wip_override: @@ -179,7 +178,7 @@ def main( # shortcut escape without hitting db if nothing to do path_details = collect_files_impl(args.data_dir, args.specific_issue_date) if not path_details: - print('nothing to do; exiting...') + logger.info('nothing to do; exiting...') return logger.info("Ingesting CSVs", csv_count = len(path_details)) @@ -195,7 +194,8 @@ def main( logger, is_wip_override=wip_override) logger.info("Finished inserting database rows", row_count = modified_row_count) - print('inserted/updated %d rows' % modified_row_count) + # the following print statement serves the same function as the logger.info call above + # print('inserted/updated %d rows' % modified_row_count) finally: # unconditionally commit database changes since CSVs have been archived database.disconnect(True) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 956cdd693..b8bf4f525 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -16,6 +16,7 @@ # first party import delphi.operations.secrets as secrets +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger class CovidcastRow(): """A container for all the values of a single covidcast row.""" @@ -248,7 +249,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False def compute_covidcast_meta(self, table_name='covidcast', use_index=True): """Compute and return metadata on all non-WIP COVIDcast signals.""" - + logger = get_structured_logger("compute_covidcast_meta") index_hint = "" if use_index: index_hint = "USE INDEX (for_metadata)" @@ -304,7 +305,7 @@ def compute_covidcast_meta(self, table_name='covidcast', use_index=True): meta_lock = threading.Lock() def worker(): - print("starting thread: " + threading.current_thread().name) + logger.info("starting thread: " + threading.current_thread().name) # set up new db connection for thread worker_dbc = Database() worker_dbc.connect(connector_impl=self._connector_impl) @@ -319,7 +320,7 @@ def worker(): )) srcsigs.task_done() except Empty: - print("no jobs left, thread terminating: " + threading.current_thread().name) + logger.info("no jobs left, thread terminating: " + threading.current_thread().name) finally: worker_dbc.disconnect(False) # cleanup @@ -330,10 +331,10 @@ def worker(): threads.append(t) srcsigs.join() - print("jobs complete") + logger.info("jobs complete") for t in threads: t.join() - print("threads terminated") + logger.error("threads terminated") # sort the metadata because threaded workers dgaf sorting_fields = "data_source signal time_type geo_type".split() diff --git a/src/acquisition/covidcast/file_archiver.py b/src/acquisition/covidcast/file_archiver.py index 75b4d5f80..92686f3cf 100644 --- a/src/acquisition/covidcast/file_archiver.py +++ b/src/acquisition/covidcast/file_archiver.py @@ -5,6 +5,8 @@ import os import shutil +# first party +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger class FileArchiver: """Archives files by moving and compressing.""" @@ -40,6 +42,7 @@ def archive_file( file already exists, it will be overwritten. """ + logger = get_structured_logger("file_archiver") src = os.path.join(path_src, filename) dst = os.path.join(path_dst, filename) @@ -51,7 +54,7 @@ def archive_file( if os.path.exists(dst): # warn that destination is about to be overwritten - print('destination exists, will overwrite (%s)' % dst) + logger.warning(event='destination exists, will overwrite', file=dst) if compress: # make a compressed copy diff --git a/src/acquisition/covidcast/fill_is_latest_issue.py b/src/acquisition/covidcast/fill_is_latest_issue.py index 6973e76cb..378983f9e 100644 --- a/src/acquisition/covidcast/fill_is_latest_issue.py +++ b/src/acquisition/covidcast/fill_is_latest_issue.py @@ -8,7 +8,7 @@ # first party import delphi.operations.secrets as secrets - +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger # partition configuration ###PARTITION_VARIABLE = 'geo_value' @@ -34,6 +34,9 @@ def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITION=_FILTER_CONDITION): + + logger = get_structured_logger("fill_is_lastest_issue") + u, p = secrets.db.epi connection = mysql.connector.connect( host=secrets.db.host, @@ -94,7 +97,7 @@ def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITI commit = True except Exception as e: connection.rollback() - print("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE)) + logger.exception("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE)) raise e finally: cursor.close() diff --git a/src/acquisition/covidcast/generate_islatest_fix_sql.py b/src/acquisition/covidcast/generate_islatest_fix_sql.py index 939e1edff..115a7d131 100644 --- a/src/acquisition/covidcast/generate_islatest_fix_sql.py +++ b/src/acquisition/covidcast/generate_islatest_fix_sql.py @@ -14,8 +14,6 @@ for typ in ('num', 'prop'): signals.append(case+period+count+typ) ### signals = ['sig2'] ### - - # variable to split on, 'time_value' is good because its high cardinality is suitable for chunking PARTITION_VARIABLE = 'time_value' PARTITION_SPLITS = [20200101 + i*100 for i in range(10)] # first day of the month for jan - oct 2020 in YYYYMMDD form @@ -40,7 +38,6 @@ ge_condition = 'TRUE' if partition_index == 0 else f'`{PARTITION_VARIABLE}` >= {PARTITION_SPLITS[partition_index - 1]}' l_condition = 'TRUE' if partition_index == len(PARTITION_SPLITS) else f'`{PARTITION_VARIABLE}` < {PARTITION_SPLITS[partition_index]}' partition_condition = f'({ge_condition}) AND ({l_condition})' - for sig in signals: where_clause = base_where_clause + " AND `signal`='%s' AND %s" % (sig, partition_condition) @@ -63,4 +60,4 @@ # clean up temp table print("-- TODO: drop this table") -print("-- DROP TABLE `islatest_fix`;") +print("-- DROP TABLE `islatest_fix`;") \ No newline at end of file