From b34d5dd5898913ed5ac18f82f1ddc039319f80d4 Mon Sep 17 00:00:00 2001 From: Tara Lakdawala Date: Thu, 24 Jun 2021 12:55:55 -0500 Subject: [PATCH 01/14] initial changes, not done yet --- src/acquisition/covidcast/csv_importer.py | 28 +++++++++++--------- src/acquisition/covidcast/csv_to_database.py | 27 ++++++++++--------- 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 1098e54e9..a1fd6b70c 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -14,6 +14,8 @@ # first party from delphi_utils import Nans from delphi.utils.epiweek import delta_epiweeks +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger + class CsvImporter: """Finds and parses covidcast CSV files.""" @@ -83,20 +85,20 @@ def is_sane_week(value): return value @staticmethod - def find_issue_specific_csv_files(scan_dir, glob=glob): + def find_issue_specific_csv_files(scan_dir, logger, glob=glob): for path in sorted(glob.glob(os.path.join(scan_dir, '*'))): issuedir_match = CsvImporter.PATTERN_ISSUE_DIR.match(path.lower()) if issuedir_match and os.path.isdir(path): issue_date_value = int(issuedir_match.group(2)) issue_date = CsvImporter.is_sane_day(issue_date_value) if issue_date: - print(' processing csv files from issue date: "' + str(issue_date) + '", directory', path) + logger.info(' processing csv files from issue date: "' + str(issue_date) + '", directory', path) yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)), glob=glob) else: - print(' invalid issue directory day', issue_date_value) + logger.info(' invalid issue directory day', issue_date_value) @staticmethod - def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob): + def find_csv_files(scan_dir, logger, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob): """Recursively search for and yield covidcast-format CSV files. scan_dir: the directory to scan (recursively) @@ -118,13 +120,13 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() # safe to ignore this file continue - print('file:', path) + logger.info('file:', path) # match a daily or weekly naming pattern daily_match = CsvImporter.PATTERN_DAILY.match(path.lower()) weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower()) if not daily_match and not weekly_match: - print(' invalid csv path/filename', path) + logger.error(' invalid csv path/filename', path) yield (path, None) continue @@ -135,7 +137,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() match = daily_match time_value_day = CsvImporter.is_sane_day(time_value) if not time_value_day: - print(' invalid filename day', time_value) + logger.error(' invalid filename day', time_value) yield (path, None) continue issue_value=issue_day_value @@ -146,7 +148,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() match = weekly_match time_value_week=CsvImporter.is_sane_week(time_value) if not time_value_week: - print(' invalid filename week', time_value) + logger.error(' invalid filename week', time_value) yield (path, None) continue issue_value=issue_epiweek_value @@ -155,7 +157,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() # # extract and validate geographic resolution geo_type = match.group(3).lower() if geo_type not in CsvImporter.GEOGRAPHIC_RESOLUTIONS: - print(' invalid geo_type', geo_type) + logger.error(' invalid geo_type', geo_type) yield (path, None) continue @@ -163,7 +165,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() source = match.group(1).lower() signal = match.group(4).lower() if len(signal) > 64: - print(' invalid signal name (64 char limit)',signal) + logger.error(' invalid signal name (64 char limit)',signal) yield (path, None) continue @@ -335,7 +337,7 @@ def extract_and_check_row(row, geo_type): return (row_values, None) @staticmethod - def load_csv(filepath, geo_type, pandas=pandas): + def load_csv(filepath, geo_type, logger, pandas=pandas): """Load, validate, and yield data as `RowValues` from a CSV file. filepath: the CSV file to be loaded @@ -349,14 +351,14 @@ def load_csv(filepath, geo_type, pandas=pandas): table = pandas.read_csv(filepath, dtype='str') if not CsvImporter.is_header_valid(table.columns): - print(' invalid header') + logger.error(' invalid header') yield None return for row in table.itertuples(index=False): row_values, error = CsvImporter.extract_and_check_row(row, geo_type) if error: - print(' invalid value for %s (%s)' % (str(row), error)) + logger.error(' invalid value for %s (%s)' % (str(row), error)) yield None continue yield row_values diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index cd0e5ba06..eb220514d 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -37,25 +37,25 @@ def get_argument_parser(): help="filename for log output (defaults to stdout)") return parser -def collect_files(data_dir, specific_issue_date, csv_importer_impl=CsvImporter): +def collect_files(data_dir, specific_issue_date, logger,csv_importer_impl=CsvImporter): """Fetch path and data profile details for each file to upload.""" if specific_issue_date: results = list(csv_importer_impl.find_issue_specific_csv_files(data_dir)) else: results = list(csv_importer_impl.find_csv_files(os.path.join(data_dir, 'receiving'))) - print(f'found {len(results)} files') + logger.info(f'found {len(results)} files') return results -def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver): +def make_handlers(data_dir, specific_issue_date, logger, file_archiver_impl=FileArchiver): if specific_issue_date: # issue-specific uploads are always one-offs, so we can leave all # files in place without worrying about cleaning up def handle_failed(path_src, filename, source): - print(f'leaving failed file alone - {source}') + logger.info(f'leaving failed file alone - {source}') def handle_successful(path_src, filename, source): - print('archiving as successful') + logger.info('archiving as successful') file_archiver_impl.archive_inplace(path_src, filename) else: # normal automation runs require some shuffling to remove files @@ -65,14 +65,14 @@ def handle_successful(path_src, filename, source): # helper to archive a failed file without compression def handle_failed(path_src, filename, source): - print('archiving as failed - '+source) + logger.info('archiving as failed - '+source) path_dst = os.path.join(archive_failed_dir, source) compress = False file_archiver_impl.archive_file(path_src, path_dst, filename, compress) # helper to archive a successful file with compression def handle_successful(path_src, filename, source): - print('archiving as successful') + logger.info('archiving as successful') path_dst = os.path.join(archive_successful_dir, source) compress = True file_archiver_impl.archive_file(path_src, path_dst, filename, compress) @@ -104,7 +104,7 @@ def upload_archive( # iterate over each file for path, details in path_details: - print('handling ', path) + logger.info(f'handling {path}') path_src, filename = os.path.split(path) if not details: @@ -130,7 +130,7 @@ def upload_archive( if all_rows_valid: try: modified_row_count = database.insert_or_update_bulk(rows_list) - print(f"insert_or_update_bulk {filename} returned {modified_row_count}") + logger.info(f"insert_or_update_bulk {filename} returned {modified_row_count}") logger.info( "Inserted database rows", row_count = modified_row_count, @@ -145,7 +145,7 @@ def upload_archive( database.commit() except Exception as e: all_rows_valid = False - print('exception while inserting rows:', e) + logger.exception('exception while inserting rows:', e) database.rollback() # archive the current file based on validation results @@ -168,7 +168,7 @@ def main( start_time = time.time() if args.is_wip_override and args.not_wip_override: - print('conflicting overrides for forcing WIP option! exiting...') + logger.error('conflicting overrides for forcing WIP option! exiting...') return wip_override = None if args.is_wip_override: @@ -179,7 +179,7 @@ def main( # shortcut escape without hitting db if nothing to do path_details = collect_files_impl(args.data_dir, args.specific_issue_date) if not path_details: - print('nothing to do; exiting...') + logger.info('nothing to do; exiting...') return logger.info("Ingesting CSVs", csv_count = len(path_details)) @@ -195,7 +195,8 @@ def main( logger, is_wip_override=wip_override) logger.info("Finished inserting database rows", row_count = modified_row_count) - print('inserted/updated %d rows' % modified_row_count) + # the following print statement serves the same function as the logger.info call above + # print('inserted/updated %d rows' % modified_row_count) finally: # unconditionally commit database changes since CSVs have been archived database.disconnect(True) From 899bab3120bdd705ad0703522c2660cabd7db278 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Thu, 24 Jun 2021 15:14:51 -0400 Subject: [PATCH 02/14] modified others print(), pass the integration / unit test --- .../covidcast/covidcast_meta_cache_updater.py | 6 +++--- src/acquisition/covidcast/database.py | 13 +++++++------ src/acquisition/covidcast/file_archiver.py | 7 ++++++- .../covidcast/fill_is_latest_issue.py | 9 +++++++-- .../covidcast/generate_islatest_fix_sql.py | 17 ++++++++++------- 5 files changed, 33 insertions(+), 19 deletions(-) diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index 824cced93..624a1919b 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -48,10 +48,10 @@ def main(args, epidata_impl=Epidata, database_impl=Database): if len(metadata)==0: args = ("no results",-2) - print('covidcast_meta result: %s (code %d)' % args) + logger.info('covidcast_meta result: %s (code %d)' % args) if args[-1] != 1: - print('unable to cache epidata') + logger.error('unable to cache epidata') return False # update the cache @@ -59,7 +59,7 @@ def main(args, epidata_impl=Epidata, database_impl=Database): metadata_update_start_time = time.time() database.update_covidcast_meta_cache(metadata) metadata_update_interval_in_seconds = time.time() - metadata_update_start_time - print('successfully cached epidata') + logger.info('successfully cached epidata') finally: # no catch block so that an exception above will cause the program to # fail after the following cleanup diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 956cdd693..469fe46a8 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -16,6 +16,7 @@ # first party import delphi.operations.secrets as secrets +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger class CovidcastRow(): """A container for all the values of a single covidcast row.""" @@ -246,7 +247,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False self._cursor.execute(drop_tmp_table_sql) return total - def compute_covidcast_meta(self, table_name='covidcast', use_index=True): + def compute_covidcast_meta(self, logger, table_name='covidcast', use_index=True): """Compute and return metadata on all non-WIP COVIDcast signals.""" index_hint = "" @@ -303,8 +304,8 @@ def compute_covidcast_meta(self, table_name='covidcast', use_index=True): meta = [] meta_lock = threading.Lock() - def worker(): - print("starting thread: " + threading.current_thread().name) + def worker(logger): + logger.info("starting thread: " + threading.current_thread().name) # set up new db connection for thread worker_dbc = Database() worker_dbc.connect(connector_impl=self._connector_impl) @@ -319,7 +320,7 @@ def worker(): )) srcsigs.task_done() except Empty: - print("no jobs left, thread terminating: " + threading.current_thread().name) + logger.info("no jobs left, thread terminating: " + threading.current_thread().name) finally: worker_dbc.disconnect(False) # cleanup @@ -330,10 +331,10 @@ def worker(): threads.append(t) srcsigs.join() - print("jobs complete") + logger.info("jobs complete") for t in threads: t.join() - print("threads terminated") + logger.info("threads terminated") # sort the metadata because threaded workers dgaf sorting_fields = "data_source signal time_type geo_type".split() diff --git a/src/acquisition/covidcast/file_archiver.py b/src/acquisition/covidcast/file_archiver.py index 75b4d5f80..4fbae5055 100644 --- a/src/acquisition/covidcast/file_archiver.py +++ b/src/acquisition/covidcast/file_archiver.py @@ -5,6 +5,8 @@ import os import shutil +# first party +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger class FileArchiver: """Archives files by moving and compressing.""" @@ -40,6 +42,9 @@ def archive_file( file already exists, it will be overwritten. """ + logger =get_structured_logger( + "file_archiver" + ) src = os.path.join(path_src, filename) dst = os.path.join(path_dst, filename) @@ -51,7 +56,7 @@ def archive_file( if os.path.exists(dst): # warn that destination is about to be overwritten - print('destination exists, will overwrite (%s)' % dst) + logger.warning('destination exists, will overwrite (%s)' % dst) if compress: # make a compressed copy diff --git a/src/acquisition/covidcast/fill_is_latest_issue.py b/src/acquisition/covidcast/fill_is_latest_issue.py index 6973e76cb..86c555f7b 100644 --- a/src/acquisition/covidcast/fill_is_latest_issue.py +++ b/src/acquisition/covidcast/fill_is_latest_issue.py @@ -8,7 +8,7 @@ # first party import delphi.operations.secrets as secrets - +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger # partition configuration ###PARTITION_VARIABLE = 'geo_value' @@ -34,6 +34,11 @@ def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITION=_FILTER_CONDITION): + + logger = get_structured_logger( + "fill_is_lastest_issue", + log_exceptions=False) + u, p = secrets.db.epi connection = mysql.connector.connect( host=secrets.db.host, @@ -94,7 +99,7 @@ def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITI commit = True except Exception as e: connection.rollback() - print("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE)) + logger.exception("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE)) raise e finally: cursor.close() diff --git a/src/acquisition/covidcast/generate_islatest_fix_sql.py b/src/acquisition/covidcast/generate_islatest_fix_sql.py index 939e1edff..85eff4e51 100644 --- a/src/acquisition/covidcast/generate_islatest_fix_sql.py +++ b/src/acquisition/covidcast/generate_islatest_fix_sql.py @@ -1,7 +1,10 @@ +from delphi.epidata.acquisition.covidcast.logger import get_structured_logger # what data to operate on base_where_clause = "WHERE `source`='jhu-csse' AND `time_type`='day'" ### base_where_clause = "WHERE `source`='src2' AND `time_type`='day'" ### +logger = get_structured_logger( + "generate_islatest_fix_sql") # signal name construction # NOTE: selecting these (unique) from the database takes 7-8 mins, so reconstructing here for efficiency @@ -22,7 +25,7 @@ ### PARTITION_SPLITS = [1,2] ### -print(''' +logger.info(''' -- -- run this as: -- python3 generate_islatest_fix_sql.py > islatest_fix.sql @@ -33,7 +36,7 @@ ''') # create temp table -print("CREATE TABLE `islatest_fix` (`latest_id` INT(11) NOT NULL, PRIMARY KEY (`latest_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;") +logger.info("CREATE TABLE `islatest_fix` (`latest_id` INT(11) NOT NULL, PRIMARY KEY (`latest_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;") # find latest issue by partition (and by signal) and save primary ids into temp table for partition_index in range(len(PARTITION_SPLITS)+1): @@ -44,7 +47,7 @@ for sig in signals: where_clause = base_where_clause + " AND `signal`='%s' AND %s" % (sig, partition_condition) - print(''' + logger.info(''' INSERT INTO `islatest_fix` SELECT id FROM ( SELECT `source`, `signal`, `time_type`, `geo_type`, `geo_value`, `time_value`, MAX(`issue`) AS `issue` FROM `covidcast` @@ -56,11 +59,11 @@ ''') # clear any current (potentially erroneous) is_latest_issue flags -print("UPDATE `covidcast` SET `is_latest_issue`=0 " + base_where_clause + " AND `is_latest_issue`=1;") +logger.info("UPDATE `covidcast` SET `is_latest_issue`=0 " + base_where_clause + " AND `is_latest_issue`=1;") # re-set proper is_latest_issue flags -print("UPDATE (SELECT `latest_id` FROM `islatest_fix`) xxx LEFT JOIN `covidcast` ON `xxx`.`latest_id`=`covidcast`.`id` SET `covidcast`.`is_latest_issue`=1;") +logger.info("UPDATE (SELECT `latest_id` FROM `islatest_fix`) xxx LEFT JOIN `covidcast` ON `xxx`.`latest_id`=`covidcast`.`id` SET `covidcast`.`is_latest_issue`=1;") # clean up temp table -print("-- TODO: drop this table") -print("-- DROP TABLE `islatest_fix`;") +logger.info("-- TODO: drop this table") +logger.info("-- DROP TABLE `islatest_fix`;") From 9fd0d0c9bc7c8b2afeca6970d40be6340dc0dea2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Thu, 24 Jun 2021 15:33:30 -0400 Subject: [PATCH 03/14] fix the part of compute_covidcast_meta --- src/acquisition/covidcast/database.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 469fe46a8..e573f9072 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -247,9 +247,9 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False self._cursor.execute(drop_tmp_table_sql) return total - def compute_covidcast_meta(self, logger, table_name='covidcast', use_index=True): + def compute_covidcast_meta(self, table_name='covidcast', use_index=True): """Compute and return metadata on all non-WIP COVIDcast signals.""" - + logger=get_structured_logger("compute_covidcast_meta") index_hint = "" if use_index: index_hint = "USE INDEX (for_metadata)" @@ -304,7 +304,7 @@ def compute_covidcast_meta(self, logger, table_name='covidcast', use_index=True) meta = [] meta_lock = threading.Lock() - def worker(logger): + def worker(): logger.info("starting thread: " + threading.current_thread().name) # set up new db connection for thread worker_dbc = Database() @@ -320,7 +320,7 @@ def worker(logger): )) srcsigs.task_done() except Empty: - logger.info("no jobs left, thread terminating: " + threading.current_thread().name) + logger.exception("no jobs left, thread terminating: " + threading.current_thread().name) finally: worker_dbc.disconnect(False) # cleanup From fe783307986559d0e1cccbe986efc33c36902a5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Thu, 24 Jun 2021 17:59:07 -0400 Subject: [PATCH 04/14] modified the exception part --- src/acquisition/covidcast/fill_is_latest_issue.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/acquisition/covidcast/fill_is_latest_issue.py b/src/acquisition/covidcast/fill_is_latest_issue.py index 86c555f7b..2a6e5957e 100644 --- a/src/acquisition/covidcast/fill_is_latest_issue.py +++ b/src/acquisition/covidcast/fill_is_latest_issue.py @@ -36,8 +36,7 @@ def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITI logger = get_structured_logger( - "fill_is_lastest_issue", - log_exceptions=False) + "fill_is_lastest_issue", log_exceptions=False) u, p = secrets.db.epi connection = mysql.connector.connect( From 84bd4d6883fa7207ec2f9084a156134a1af413f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Thu, 24 Jun 2021 18:11:01 -0400 Subject: [PATCH 05/14] Update of the csv_importer, csv_to_databse --- src/acquisition/covidcast/csv_importer.py | 21 ++++++++++---------- src/acquisition/covidcast/csv_to_database.py | 11 ++++++---- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index a1fd6b70c..cb6a5c3ee 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -85,7 +85,8 @@ def is_sane_week(value): return value @staticmethod - def find_issue_specific_csv_files(scan_dir, logger, glob=glob): + def find_issue_specific_csv_files(scan_dir, glob=glob): + logger= get_structured_logger('find_issue_specific_csv_files') for path in sorted(glob.glob(os.path.join(scan_dir, '*'))): issuedir_match = CsvImporter.PATTERN_ISSUE_DIR.match(path.lower()) if issuedir_match and os.path.isdir(path): @@ -98,7 +99,7 @@ def find_issue_specific_csv_files(scan_dir, logger, glob=glob): logger.info(' invalid issue directory day', issue_date_value) @staticmethod - def find_csv_files(scan_dir, logger, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob): + def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob): """Recursively search for and yield covidcast-format CSV files. scan_dir: the directory to scan (recursively) @@ -107,7 +108,7 @@ def find_csv_files(scan_dir, logger, issue=(date.today(), epi.Week.fromdate(date valid, details is a tuple of (source, signal, time_type, geo_type, time_value, issue, lag) (otherwise None). """ - + logger= get_structured_logger('find_csv_files') issue_day,issue_epiweek=issue issue_day_value=int(issue_day.strftime("%Y%m%d")) issue_epiweek_value=int(str(issue_epiweek)) @@ -126,7 +127,7 @@ def find_csv_files(scan_dir, logger, issue=(date.today(), epi.Week.fromdate(date daily_match = CsvImporter.PATTERN_DAILY.match(path.lower()) weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower()) if not daily_match and not weekly_match: - logger.error(' invalid csv path/filename', path) + logger.info(' invalid csv path/filename', path) yield (path, None) continue @@ -137,7 +138,7 @@ def find_csv_files(scan_dir, logger, issue=(date.today(), epi.Week.fromdate(date match = daily_match time_value_day = CsvImporter.is_sane_day(time_value) if not time_value_day: - logger.error(' invalid filename day', time_value) + logger.info(' invalid filename day', time_value) yield (path, None) continue issue_value=issue_day_value @@ -148,7 +149,7 @@ def find_csv_files(scan_dir, logger, issue=(date.today(), epi.Week.fromdate(date match = weekly_match time_value_week=CsvImporter.is_sane_week(time_value) if not time_value_week: - logger.error(' invalid filename week', time_value) + logger.info(' invalid filename week', time_value) yield (path, None) continue issue_value=issue_epiweek_value @@ -157,7 +158,7 @@ def find_csv_files(scan_dir, logger, issue=(date.today(), epi.Week.fromdate(date # # extract and validate geographic resolution geo_type = match.group(3).lower() if geo_type not in CsvImporter.GEOGRAPHIC_RESOLUTIONS: - logger.error(' invalid geo_type', geo_type) + logger.info(' invalid geo_type', geo_type) yield (path, None) continue @@ -165,7 +166,7 @@ def find_csv_files(scan_dir, logger, issue=(date.today(), epi.Week.fromdate(date source = match.group(1).lower() signal = match.group(4).lower() if len(signal) > 64: - logger.error(' invalid signal name (64 char limit)',signal) + logger.info(' invalid signal name (64 char limit)',signal) yield (path, None) continue @@ -337,7 +338,7 @@ def extract_and_check_row(row, geo_type): return (row_values, None) @staticmethod - def load_csv(filepath, geo_type, logger, pandas=pandas): + def load_csv(filepath, geo_type, pandas=pandas): """Load, validate, and yield data as `RowValues` from a CSV file. filepath: the CSV file to be loaded @@ -346,7 +347,7 @@ def load_csv(filepath, geo_type, logger, pandas=pandas): In case of a validation error, `None` is yielded for the offending row, including the header. """ - + logger= get_structured_logger('load_csv') # don't use type inference, just get strings table = pandas.read_csv(filepath, dtype='str') diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index eb220514d..9287149da 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -37,9 +37,9 @@ def get_argument_parser(): help="filename for log output (defaults to stdout)") return parser -def collect_files(data_dir, specific_issue_date, logger,csv_importer_impl=CsvImporter): +def collect_files(data_dir, specific_issue_date,csv_importer_impl=CsvImporter): """Fetch path and data profile details for each file to upload.""" - + logger= get_structured_logger('collect_files') if specific_issue_date: results = list(csv_importer_impl.find_issue_specific_csv_files(data_dir)) else: @@ -47,14 +47,16 @@ def collect_files(data_dir, specific_issue_date, logger,csv_importer_impl=CsvImp logger.info(f'found {len(results)} files') return results -def make_handlers(data_dir, specific_issue_date, logger, file_archiver_impl=FileArchiver): +def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver): if specific_issue_date: # issue-specific uploads are always one-offs, so we can leave all # files in place without worrying about cleaning up def handle_failed(path_src, filename, source): + logger= get_structured_logger('handle_failed') logger.info(f'leaving failed file alone - {source}') def handle_successful(path_src, filename, source): + logger= get_structured_logger('handle_successful') logger.info('archiving as successful') file_archiver_impl.archive_inplace(path_src, filename) else: @@ -65,6 +67,7 @@ def handle_successful(path_src, filename, source): # helper to archive a failed file without compression def handle_failed(path_src, filename, source): + logger= get_structured_logger('handle_failed') logger.info('archiving as failed - '+source) path_dst = os.path.join(archive_failed_dir, source) compress = False @@ -72,6 +75,7 @@ def handle_failed(path_src, filename, source): # helper to archive a successful file with compression def handle_successful(path_src, filename, source): + logger= get_structured_logger('handle_successful') logger.info('archiving as successful') path_dst = os.path.join(archive_successful_dir, source) compress = True @@ -101,7 +105,6 @@ def upload_archive( """ archive_as_successful, archive_as_failed = handlers total_modified_row_count = 0 - # iterate over each file for path, details in path_details: logger.info(f'handling {path}') From 91ed202a2c91a0bed505829025fa8ca02110724d Mon Sep 17 00:00:00 2001 From: Tara Lakdawala Date: Thu, 24 Jun 2021 17:27:35 -0500 Subject: [PATCH 06/14] whitespace edits --- src/acquisition/covidcast/csv_importer.py | 2 +- src/acquisition/covidcast/csv_to_database.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index cb6a5c3ee..1b83e85d5 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -347,7 +347,7 @@ def load_csv(filepath, geo_type, pandas=pandas): In case of a validation error, `None` is yielded for the offending row, including the header. """ - logger= get_structured_logger('load_csv') + logger = get_structured_logger('load_csv') # don't use type inference, just get strings table = pandas.read_csv(filepath, dtype='str') diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 9287149da..94714e14d 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -75,7 +75,7 @@ def handle_failed(path_src, filename, source): # helper to archive a successful file with compression def handle_successful(path_src, filename, source): - logger= get_structured_logger('handle_successful') + logger = get_structured_logger('handle_successful') logger.info('archiving as successful') path_dst = os.path.join(archive_successful_dir, source) compress = True From f1f72b019ae2ca1e2932a75d88a828e4ffcd8e92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Thu, 24 Jun 2021 18:34:50 -0400 Subject: [PATCH 07/14] delete white space --- src/acquisition/covidcast/csv_importer.py | 19 +++++++++---------- src/acquisition/covidcast/database.py | 4 ++-- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index cb6a5c3ee..411a52485 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -16,7 +16,6 @@ from delphi.utils.epiweek import delta_epiweeks from delphi.epidata.acquisition.covidcast.logger import get_structured_logger - class CsvImporter: """Finds and parses covidcast CSV files.""" @@ -93,10 +92,10 @@ def find_issue_specific_csv_files(scan_dir, glob=glob): issue_date_value = int(issuedir_match.group(2)) issue_date = CsvImporter.is_sane_day(issue_date_value) if issue_date: - logger.info(' processing csv files from issue date: "' + str(issue_date) + '", directory', path) + logger.info('processing csv files from issue date: "' + str(issue_date) + '", directory', path) yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)), glob=glob) else: - logger.info(' invalid issue directory day', issue_date_value) + logger.error('invalid issue directory day', issue_date_value) @staticmethod def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob): @@ -127,7 +126,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() daily_match = CsvImporter.PATTERN_DAILY.match(path.lower()) weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower()) if not daily_match and not weekly_match: - logger.info(' invalid csv path/filename', path) + logger.info('invalid csv path/filename', path) yield (path, None) continue @@ -138,7 +137,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() match = daily_match time_value_day = CsvImporter.is_sane_day(time_value) if not time_value_day: - logger.info(' invalid filename day', time_value) + logger.info('invalid filename day', time_value) yield (path, None) continue issue_value=issue_day_value @@ -149,7 +148,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() match = weekly_match time_value_week=CsvImporter.is_sane_week(time_value) if not time_value_week: - logger.info(' invalid filename week', time_value) + logger.info('invalid filename week', time_value) yield (path, None) continue issue_value=issue_epiweek_value @@ -158,7 +157,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() # # extract and validate geographic resolution geo_type = match.group(3).lower() if geo_type not in CsvImporter.GEOGRAPHIC_RESOLUTIONS: - logger.info(' invalid geo_type', geo_type) + logger.info('invalid geo_type', geo_type) yield (path, None) continue @@ -166,7 +165,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() source = match.group(1).lower() signal = match.group(4).lower() if len(signal) > 64: - logger.info(' invalid signal name (64 char limit)',signal) + logger.info('invalid signal name (64 char limit)',signal) yield (path, None) continue @@ -352,14 +351,14 @@ def load_csv(filepath, geo_type, pandas=pandas): table = pandas.read_csv(filepath, dtype='str') if not CsvImporter.is_header_valid(table.columns): - logger.error(' invalid header') + logger.error('invalid header') yield None return for row in table.itertuples(index=False): row_values, error = CsvImporter.extract_and_check_row(row, geo_type) if error: - logger.error(' invalid value for %s (%s)' % (str(row), error)) + logger.error('invalid value for %s (%s)' % (str(row), error)) yield None continue yield row_values diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index e573f9072..c4cce3425 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -249,7 +249,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False def compute_covidcast_meta(self, table_name='covidcast', use_index=True): """Compute and return metadata on all non-WIP COVIDcast signals.""" - logger=get_structured_logger("compute_covidcast_meta") + logger=get_structured_logger("compute_covidcast_meta", log_exceptions=False) index_hint = "" if use_index: index_hint = "USE INDEX (for_metadata)" @@ -334,7 +334,7 @@ def worker(): logger.info("jobs complete") for t in threads: t.join() - logger.info("threads terminated") + logger.error("threads terminated") # sort the metadata because threaded workers dgaf sorting_fields = "data_source signal time_type geo_type".split() From 8c5d53c91134749e5968fc9506c4013f8503c116 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Thu, 24 Jun 2021 18:36:43 -0400 Subject: [PATCH 08/14] delete white space --- src/acquisition/covidcast/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index c4cce3425..4f855e0f7 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -249,7 +249,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False def compute_covidcast_meta(self, table_name='covidcast', use_index=True): """Compute and return metadata on all non-WIP COVIDcast signals.""" - logger=get_structured_logger("compute_covidcast_meta", log_exceptions=False) + logger = get_structured_logger("compute_covidcast_meta", log_exceptions=False) index_hint = "" if use_index: index_hint = "USE INDEX (for_metadata)" From af2d10aea40400c09573c5b5242ab8fccaf3f6c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Fri, 25 Jun 2021 19:05:13 -0400 Subject: [PATCH 09/14] modified the problem included structure logging of warning --- src/acquisition/covidcast/csv_importer.py | 22 +++++++++---------- src/acquisition/covidcast/database.py | 4 ++-- src/acquisition/covidcast/file_archiver.py | 6 ++--- .../covidcast/fill_is_latest_issue.py | 3 +-- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 2b8d237f7..ee295b61f 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -85,7 +85,7 @@ def is_sane_week(value): @staticmethod def find_issue_specific_csv_files(scan_dir, glob=glob): - logger= get_structured_logger('find_issue_specific_csv_files') + logger = get_structured_logger('find_issue_specific_csv_files') for path in sorted(glob.glob(os.path.join(scan_dir, '*'))): issuedir_match = CsvImporter.PATTERN_ISSUE_DIR.match(path.lower()) if issuedir_match and os.path.isdir(path): @@ -95,7 +95,7 @@ def find_issue_specific_csv_files(scan_dir, glob=glob): logger.info('processing csv files from issue date: "' + str(issue_date) + '", directory', path) yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)), glob=glob) else: - logger.error('invalid issue directory day', issue_date_value) + logger.warning(event='invalid issue directory day', file=issue_date_value) @staticmethod def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob): @@ -107,7 +107,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() valid, details is a tuple of (source, signal, time_type, geo_type, time_value, issue, lag) (otherwise None). """ - logger= get_structured_logger('find_csv_files') + logger = get_structured_logger('find_csv_files') issue_day,issue_epiweek=issue issue_day_value=int(issue_day.strftime("%Y%m%d")) issue_epiweek_value=int(str(issue_epiweek)) @@ -120,13 +120,13 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() # safe to ignore this file continue - logger.info('file:', path) + logger.info(event='file:', file=path) # match a daily or weekly naming pattern daily_match = CsvImporter.PATTERN_DAILY.match(path.lower()) weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower()) if not daily_match and not weekly_match: - logger.info('invalid csv path/filename', path) + logger.warning(event='invalid csv path/filename', detail=path, file=path) yield (path, None) continue @@ -137,7 +137,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() match = daily_match time_value_day = CsvImporter.is_sane_day(time_value) if not time_value_day: - logger.info('invalid filename day', time_value) + logger.warning(event='invalid filename day', detail=time_value, file=path) yield (path, None) continue issue_value=issue_day_value @@ -148,7 +148,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() match = weekly_match time_value_week=CsvImporter.is_sane_week(time_value) if not time_value_week: - logger.info('invalid filename week', time_value) + logger.warning(event='invalid filename week', detail=time_value, file=path) yield (path, None) continue issue_value=issue_epiweek_value @@ -157,7 +157,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() # # extract and validate geographic resolution geo_type = match.group(3).lower() if geo_type not in CsvImporter.GEOGRAPHIC_RESOLUTIONS: - logger.info('invalid geo_type', geo_type) + logger.warning(event='invalid geo_type', detail=geo_type, file=path) yield (path, None) continue @@ -165,7 +165,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() source = match.group(1).lower() signal = match.group(4).lower() if len(signal) > 64: - logger.info('invalid signal name (64 char limit)',signal) + logger.warning(event='invalid signal name (64 char limit)',detail=signal, file=path) yield (path, None) continue @@ -351,14 +351,14 @@ def load_csv(filepath, geo_type, pandas=pandas): table = pandas.read_csv(filepath, dtype='str') if not CsvImporter.is_header_valid(table.columns): - logger.error('invalid header') + logger.warning(event='invalid header') yield None return for row in table.itertuples(index=False): row_values, error = CsvImporter.extract_and_check_row(row, geo_type) if error: - logger.error('invalid value for %s (%s)' % (str(row), error)) + logger.warning(event = 'invalid value for %s (%s)', file=(str(row), error)) yield None continue yield row_values diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 4f855e0f7..b8bf4f525 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -249,7 +249,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False def compute_covidcast_meta(self, table_name='covidcast', use_index=True): """Compute and return metadata on all non-WIP COVIDcast signals.""" - logger = get_structured_logger("compute_covidcast_meta", log_exceptions=False) + logger = get_structured_logger("compute_covidcast_meta") index_hint = "" if use_index: index_hint = "USE INDEX (for_metadata)" @@ -320,7 +320,7 @@ def worker(): )) srcsigs.task_done() except Empty: - logger.exception("no jobs left, thread terminating: " + threading.current_thread().name) + logger.info("no jobs left, thread terminating: " + threading.current_thread().name) finally: worker_dbc.disconnect(False) # cleanup diff --git a/src/acquisition/covidcast/file_archiver.py b/src/acquisition/covidcast/file_archiver.py index 4fbae5055..468cc47e8 100644 --- a/src/acquisition/covidcast/file_archiver.py +++ b/src/acquisition/covidcast/file_archiver.py @@ -42,9 +42,7 @@ def archive_file( file already exists, it will be overwritten. """ - logger =get_structured_logger( - "file_archiver" - ) + logger = get_structured_logger("file_archiver") src = os.path.join(path_src, filename) dst = os.path.join(path_dst, filename) @@ -56,7 +54,7 @@ def archive_file( if os.path.exists(dst): # warn that destination is about to be overwritten - logger.warning('destination exists, will overwrite (%s)' % dst) + logger.warning(event='destination exists, will overwrite (%s)', file=dst) if compress: # make a compressed copy diff --git a/src/acquisition/covidcast/fill_is_latest_issue.py b/src/acquisition/covidcast/fill_is_latest_issue.py index 2a6e5957e..378983f9e 100644 --- a/src/acquisition/covidcast/fill_is_latest_issue.py +++ b/src/acquisition/covidcast/fill_is_latest_issue.py @@ -35,8 +35,7 @@ def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITION=_FILTER_CONDITION): - logger = get_structured_logger( - "fill_is_lastest_issue", log_exceptions=False) + logger = get_structured_logger("fill_is_lastest_issue") u, p = secrets.db.epi connection = mysql.connector.connect( From 56aa1001df5d86c9c268e5bdbccfd7bf8841e39b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Mon, 28 Jun 2021 11:16:40 -0400 Subject: [PATCH 10/14] add the argument logger to func handle_faild/handle_successful --- src/acquisition/covidcast/csv_to_database.py | 21 +++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 94714e14d..bb666a85b 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -51,12 +51,11 @@ def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver if specific_issue_date: # issue-specific uploads are always one-offs, so we can leave all # files in place without worrying about cleaning up - def handle_failed(path_src, filename, source): - logger= get_structured_logger('handle_failed') + logger= get_structured_logger('h') + def handle_failed(path_src, filename, source, logger): logger.info(f'leaving failed file alone - {source}') - def handle_successful(path_src, filename, source): - logger= get_structured_logger('handle_successful') + def handle_successful(path_src, filename, source, logger): logger.info('archiving as successful') file_archiver_impl.archive_inplace(path_src, filename) else: @@ -66,16 +65,14 @@ def handle_successful(path_src, filename, source): archive_failed_dir = os.path.join(data_dir, 'archive', 'failed') # helper to archive a failed file without compression - def handle_failed(path_src, filename, source): - logger= get_structured_logger('handle_failed') - logger.info('archiving as failed - '+source) + def handle_failed(path_src, filename, source, logger): + logger.info('archiving as failed - ' + source) path_dst = os.path.join(archive_failed_dir, source) compress = False file_archiver_impl.archive_file(path_src, path_dst, filename, compress) # helper to archive a successful file with compression - def handle_successful(path_src, filename, source): - logger = get_structured_logger('handle_successful') + def handle_successful(path_src, filename, source, logger): logger.info('archiving as successful') path_dst = os.path.join(archive_successful_dir, source) compress = True @@ -112,7 +109,7 @@ def upload_archive( if not details: # file path or name was invalid, source is unknown - archive_as_failed(path_src, filename, 'unknown') + archive_as_failed(path_src, filename, 'unknown',logger) continue (source, signal, time_type, geo_type, time_value, issue, lag) = details @@ -153,9 +150,9 @@ def upload_archive( # archive the current file based on validation results if all_rows_valid: - archive_as_successful(path_src, filename, source) + archive_as_successful(path_src, filename, source, logger) else: - archive_as_failed(path_src, filename, source) + archive_as_failed(path_src, filename, source,logger) return total_modified_row_count From 3f5efbd569c05e763a849fc7c3c7ff9790399370 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Mon, 28 Jun 2021 11:19:50 -0400 Subject: [PATCH 11/14] update --- src/acquisition/covidcast/csv_to_database.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index bb666a85b..65ac76805 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -51,7 +51,6 @@ def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver if specific_issue_date: # issue-specific uploads are always one-offs, so we can leave all # files in place without worrying about cleaning up - logger= get_structured_logger('h') def handle_failed(path_src, filename, source, logger): logger.info(f'leaving failed file alone - {source}') From dd0aa561bb052087af9586f3b46eca2e1d896762 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Mon, 28 Jun 2021 20:54:21 -0400 Subject: [PATCH 12/14] modified some structure logging --- src/acquisition/covidcast/csv_importer.py | 8 ++++---- src/acquisition/covidcast/csv_to_database.py | 10 +++++----- src/acquisition/covidcast/file_archiver.py | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index ee295b61f..cf38321c5 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -95,7 +95,7 @@ def find_issue_specific_csv_files(scan_dir, glob=glob): logger.info('processing csv files from issue date: "' + str(issue_date) + '", directory', path) yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)), glob=glob) else: - logger.warning(event='invalid issue directory day', file=issue_date_value) + logger.warning(event='invalid issue directory day', detail=issue_date_value, file=path) @staticmethod def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob): @@ -120,7 +120,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() # safe to ignore this file continue - logger.info(event='file:', file=path) + logger.info(event='covidcast-format CSV file path :', file=path) # match a daily or weekly naming pattern daily_match = CsvImporter.PATTERN_DAILY.match(path.lower()) @@ -351,14 +351,14 @@ def load_csv(filepath, geo_type, pandas=pandas): table = pandas.read_csv(filepath, dtype='str') if not CsvImporter.is_header_valid(table.columns): - logger.warning(event='invalid header') + logger.warning(event='invalid header', detail=table.columns, file=filepath) yield None return for row in table.itertuples(index=False): row_values, error = CsvImporter.extract_and_check_row(row, geo_type) if error: - logger.warning(event = 'invalid value for %s (%s)', file=(str(row), error)) + logger.warning(event = 'invalid value for row', detail=(str(row), error), file=filepath) yield None continue yield row_values diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 65ac76805..7a845134b 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -52,10 +52,10 @@ def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver # issue-specific uploads are always one-offs, so we can leave all # files in place without worrying about cleaning up def handle_failed(path_src, filename, source, logger): - logger.info(f'leaving failed file alone - {source}') + logger.info(event='leaving failed file alone', dest=source, file=filename) def handle_successful(path_src, filename, source, logger): - logger.info('archiving as successful') + logger.info(event='archiving as successful',file=filename) file_archiver_impl.archive_inplace(path_src, filename) else: # normal automation runs require some shuffling to remove files @@ -65,14 +65,14 @@ def handle_successful(path_src, filename, source, logger): # helper to archive a failed file without compression def handle_failed(path_src, filename, source, logger): - logger.info('archiving as failed - ' + source) + logger.info(event='archiving as failed - ', detail=source, file=filename) path_dst = os.path.join(archive_failed_dir, source) compress = False file_archiver_impl.archive_file(path_src, path_dst, filename, compress) # helper to archive a successful file with compression def handle_successful(path_src, filename, source, logger): - logger.info('archiving as successful') + logger.info(event='archiving as successful',file=filename) path_dst = os.path.join(archive_successful_dir, source) compress = True file_archiver_impl.archive_file(path_src, path_dst, filename, compress) @@ -103,7 +103,7 @@ def upload_archive( total_modified_row_count = 0 # iterate over each file for path, details in path_details: - logger.info(f'handling {path}') + logger.info(event='handling',dest=path) path_src, filename = os.path.split(path) if not details: diff --git a/src/acquisition/covidcast/file_archiver.py b/src/acquisition/covidcast/file_archiver.py index 468cc47e8..92686f3cf 100644 --- a/src/acquisition/covidcast/file_archiver.py +++ b/src/acquisition/covidcast/file_archiver.py @@ -54,7 +54,7 @@ def archive_file( if os.path.exists(dst): # warn that destination is about to be overwritten - logger.warning(event='destination exists, will overwrite (%s)', file=dst) + logger.warning(event='destination exists, will overwrite', file=dst) if compress: # make a compressed copy From 2447452a9faa23a6b656b4f8fe80c4ca98f1ce05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Tue, 29 Jun 2021 12:06:58 -0400 Subject: [PATCH 13/14] delete some unneccesary part of logger --- src/acquisition/covidcast/csv_importer.py | 4 +--- .../covidcast/generate_islatest_fix_sql.py | 20 +++++++------------ 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index cf38321c5..2093839d7 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -107,7 +107,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() valid, details is a tuple of (source, signal, time_type, geo_type, time_value, issue, lag) (otherwise None). """ - logger = get_structured_logger('find_csv_files') + logger = get_structured_logger('find_csv_files')jin issue_day,issue_epiweek=issue issue_day_value=int(issue_day.strftime("%Y%m%d")) issue_epiweek_value=int(str(issue_epiweek)) @@ -120,8 +120,6 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() # safe to ignore this file continue - logger.info(event='covidcast-format CSV file path :', file=path) - # match a daily or weekly naming pattern daily_match = CsvImporter.PATTERN_DAILY.match(path.lower()) weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower()) diff --git a/src/acquisition/covidcast/generate_islatest_fix_sql.py b/src/acquisition/covidcast/generate_islatest_fix_sql.py index 85eff4e51..115a7d131 100644 --- a/src/acquisition/covidcast/generate_islatest_fix_sql.py +++ b/src/acquisition/covidcast/generate_islatest_fix_sql.py @@ -1,10 +1,7 @@ -from delphi.epidata.acquisition.covidcast.logger import get_structured_logger # what data to operate on base_where_clause = "WHERE `source`='jhu-csse' AND `time_type`='day'" ### base_where_clause = "WHERE `source`='src2' AND `time_type`='day'" ### -logger = get_structured_logger( - "generate_islatest_fix_sql") # signal name construction # NOTE: selecting these (unique) from the database takes 7-8 mins, so reconstructing here for efficiency @@ -17,15 +14,13 @@ for typ in ('num', 'prop'): signals.append(case+period+count+typ) ### signals = ['sig2'] ### - - # variable to split on, 'time_value' is good because its high cardinality is suitable for chunking PARTITION_VARIABLE = 'time_value' PARTITION_SPLITS = [20200101 + i*100 for i in range(10)] # first day of the month for jan - oct 2020 in YYYYMMDD form ### PARTITION_SPLITS = [1,2] ### -logger.info(''' +print(''' -- -- run this as: -- python3 generate_islatest_fix_sql.py > islatest_fix.sql @@ -36,18 +31,17 @@ ''') # create temp table -logger.info("CREATE TABLE `islatest_fix` (`latest_id` INT(11) NOT NULL, PRIMARY KEY (`latest_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;") +print("CREATE TABLE `islatest_fix` (`latest_id` INT(11) NOT NULL, PRIMARY KEY (`latest_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;") # find latest issue by partition (and by signal) and save primary ids into temp table for partition_index in range(len(PARTITION_SPLITS)+1): ge_condition = 'TRUE' if partition_index == 0 else f'`{PARTITION_VARIABLE}` >= {PARTITION_SPLITS[partition_index - 1]}' l_condition = 'TRUE' if partition_index == len(PARTITION_SPLITS) else f'`{PARTITION_VARIABLE}` < {PARTITION_SPLITS[partition_index]}' partition_condition = f'({ge_condition}) AND ({l_condition})' - for sig in signals: where_clause = base_where_clause + " AND `signal`='%s' AND %s" % (sig, partition_condition) - logger.info(''' + print(''' INSERT INTO `islatest_fix` SELECT id FROM ( SELECT `source`, `signal`, `time_type`, `geo_type`, `geo_value`, `time_value`, MAX(`issue`) AS `issue` FROM `covidcast` @@ -59,11 +53,11 @@ ''') # clear any current (potentially erroneous) is_latest_issue flags -logger.info("UPDATE `covidcast` SET `is_latest_issue`=0 " + base_where_clause + " AND `is_latest_issue`=1;") +print("UPDATE `covidcast` SET `is_latest_issue`=0 " + base_where_clause + " AND `is_latest_issue`=1;") # re-set proper is_latest_issue flags -logger.info("UPDATE (SELECT `latest_id` FROM `islatest_fix`) xxx LEFT JOIN `covidcast` ON `xxx`.`latest_id`=`covidcast`.`id` SET `covidcast`.`is_latest_issue`=1;") +print("UPDATE (SELECT `latest_id` FROM `islatest_fix`) xxx LEFT JOIN `covidcast` ON `xxx`.`latest_id`=`covidcast`.`id` SET `covidcast`.`is_latest_issue`=1;") # clean up temp table -logger.info("-- TODO: drop this table") -logger.info("-- DROP TABLE `islatest_fix`;") +print("-- TODO: drop this table") +print("-- DROP TABLE `islatest_fix`;") \ No newline at end of file From 5b7ae8876b2eab80c3a3b9d3874cfe2079b6b668 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Tue, 29 Jun 2021 12:18:25 -0400 Subject: [PATCH 14/14] delete some wrong typing --- src/acquisition/covidcast/csv_importer.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index 2093839d7..55ec847cc 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -107,7 +107,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() valid, details is a tuple of (source, signal, time_type, geo_type, time_value, issue, lag) (otherwise None). """ - logger = get_structured_logger('find_csv_files')jin + logger = get_structured_logger('find_csv_files') issue_day,issue_epiweek=issue issue_day_value=int(issue_day.strftime("%Y%m%d")) issue_epiweek_value=int(str(issue_epiweek)) @@ -119,7 +119,6 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today() if not path.lower().endswith('.csv'): # safe to ignore this file continue - # match a daily or weekly naming pattern daily_match = CsvImporter.PATTERN_DAILY.match(path.lower()) weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower())