From 0d4188b95bde9c4e162c7ec902b7c6c0ff9f569e Mon Sep 17 00:00:00 2001 From: george haff Date: Mon, 29 Aug 2022 16:27:03 -0400 Subject: [PATCH 1/6] cli option for specifying # of metadata worker threads --- .../covidcast/covidcast_meta_cache_updater.py | 5 ++++- src/acquisition/covidcast/database.py | 10 ++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index 624a1919b..1f98faf51 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -15,6 +15,7 @@ def get_argument_parser(): parser = argparse.ArgumentParser() parser.add_argument("--log_file", help="filename for log output") + parser.add_argument("--num_threads", type=int, help="number of worker threads to spawn for processing of each source/signal pair") return parser @@ -24,8 +25,10 @@ def main(args, epidata_impl=Epidata, database_impl=Database): `args`: parsed command-line arguments """ log_file = None + num_threads = None if (args): log_file = args.log_file + num_threads = args.num_threads logger = get_structured_logger( "metadata_cache_updater", @@ -37,7 +40,7 @@ def main(args, epidata_impl=Epidata, database_impl=Database): # fetch metadata try: metadata_calculation_start_time = time.time() - metadata = database.compute_covidcast_meta() + metadata = database.compute_covidcast_meta(n_threads=num_threads) metadata_calculation_interval_in_seconds = time.time() - metadata_calculation_start_time except: # clean up before failing diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index b93514712..5336b2f2d 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -476,16 +476,18 @@ def split_list(lst, n): return total - def compute_covidcast_meta(self, table_name=None): + def compute_covidcast_meta(self, table_name=None, n_threads=None): """Compute and return metadata on all COVIDcast signals.""" logger = get_structured_logger("compute_covidcast_meta") if table_name is None: table_name = self.latest_view - n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server - # NOTE: this may present a small problem if this job runs on different hardware than the db, - # but we should not run into that issue in prod. + if n_threads is None: + logger.info("n_threads unspecified, automatically choosing based on number of detected cores...") + n_threads = max(1, cpu_count()*9//10) # aka number of concurrent db connections, which [sh|c]ould be ~<= 90% of the #cores available to SQL server + # NOTE: this may present a small problem if this job runs on different hardware than the db, + # which is why this value can be overriden by optional argument. logger.info(f"using {n_threads} workers") srcsigs = Queue() # multi-consumer threadsafe! From 5e3eb51c005720a9da1e52ee78fb1cbfc0339516 Mon Sep 17 00:00:00 2001 From: george haff Date: Mon, 29 Aug 2022 20:14:11 -0400 Subject: [PATCH 2/6] added flag to Makefile for new percona db image (and some whitespace fixes) --- dev/local/Makefile | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/dev/local/Makefile b/dev/local/Makefile index 52c9e98f0..9d6c52313 100644 --- a/dev/local/Makefile +++ b/dev/local/Makefile @@ -10,19 +10,19 @@ # Creates all prereq images (delphi_database, delphi_python) only if they don't # exist. If you need to rebuild a prereq, you're probably doing something # complicated, and can figure out the rebuild command on your own. -# -# +# +# # Commands: -# +# # web: Stops currently-running delphi_web_epidata instances, if any. # Rebuilds delphi_web_epidata image. # Runs image in the background and pipes stdout to a log file. -# +# # db: Stops currently-running delphi_database_epidata instances, if any. # Rebuilds delphi_database_epidata image. # Runs image in the background and pipes stdout to a log file. # Blocks until database is ready to receive connections. -# +# # python: Rebuilds delphi_web_python image. You shouldn't need to do this # often; only if you are installing a new environment, or have # made changes to delphi-epidata/dev/docker/python/Dockerfile. @@ -35,7 +35,7 @@ # # clean: Cleans up dangling Docker images. # -# +# # Optional arguments: # pdb=1 Drops you into debug mode upon test failure, if running tests. # test= Only runs tests in the directories provided here, e.g. @@ -105,6 +105,7 @@ db: @# Run the database @docker run --rm -p 127.0.0.1:13306:3306 \ --network delphi-net --name delphi_database_epidata \ + --cap-add=sys_nice \ delphi_database_epidata >$(LOG_DB) 2>&1 & @# Block until DB is ready @@ -127,7 +128,7 @@ py: all: web db py .PHONY=test -test: +test: @docker run -i --rm --network delphi-net \ --mount type=bind,source=$(CWD)repos/delphi/delphi-epidata,target=/usr/src/app/repos/delphi/delphi-epidata,readonly \ --mount type=bind,source=$(CWD)repos/delphi/delphi-epidata/src,target=/usr/src/app/delphi/epidata,readonly \ From b9ab3f3fe09fb6c015baece120e68fa591c06a4b Mon Sep 17 00:00:00 2001 From: george haff Date: Mon, 29 Aug 2022 20:15:06 -0400 Subject: [PATCH 3/6] converted JOINs in VIEWs to 'USING' instead of 'ON' --- src/ddl/migrations/v4_renaming.sql | 12 ++++-------- src/ddl/v4_schema.sql | 13 ++++--------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/src/ddl/migrations/v4_renaming.sql b/src/ddl/migrations/v4_renaming.sql index 9bfc7a145..c0edf8e96 100644 --- a/src/ddl/migrations/v4_renaming.sql +++ b/src/ddl/migrations/v4_renaming.sql @@ -54,10 +54,8 @@ CREATE OR REPLACE VIEW epimetric_full_v AS `t1`.`signal_key_id` AS `signal_key_id`, `t1`.`geo_key_id` AS `geo_key_id` FROM `epimetric_full` `t1` - JOIN `signal_dim` `t2` - ON `t1`.`signal_key_id` = `t2`.`signal_key_id` - JOIN `geo_dim` `t3` - ON `t1`.`geo_key_id` = `t3`.`geo_key_id`; + JOIN `signal_dim` `t2` USING (`signal_key_id`) + JOIN `geo_dim` `t3` USING (`geo_key_id`); CREATE OR REPLACE VIEW epimetric_latest_v AS SELECT 1 AS `is_latest_issue`, -- provides column-compatibility to match `covidcast` table @@ -85,10 +83,8 @@ CREATE OR REPLACE VIEW epimetric_latest_v AS `t1`.`signal_key_id` AS `signal_key_id`, `t1`.`geo_key_id` AS `geo_key_id` FROM `epimetric_latest` `t1` - JOIN `signal_dim` `t2` - ON `t1`.`signal_key_id` = `t2`.`signal_key_id` - JOIN `geo_dim` `t3` - ON `t1`.`geo_key_id` = `t3`.`geo_key_id`; + JOIN `signal_dim` `t2` USING (`signal_key_id`) + JOIN `geo_dim` `t3` USING (`geo_key_id`); -- re-create `epidata` alias VIEWs diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index cc4f8294e..7551707f6 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -8,7 +8,6 @@ CREATE TABLE geo_dim ( UNIQUE INDEX `geo_dim_index` (`geo_type`, `geo_value`) ) ENGINE=InnoDB; - CREATE TABLE signal_dim ( `signal_key_id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, `source` VARCHAR(32) NOT NULL, @@ -124,10 +123,8 @@ CREATE OR REPLACE VIEW epimetric_full_v AS `t1`.`signal_key_id` AS `signal_key_id`, `t1`.`geo_key_id` AS `geo_key_id` FROM `epimetric_full` `t1` - JOIN `signal_dim` `t2` - ON `t1`.`signal_key_id` = `t2`.`signal_key_id` - JOIN `geo_dim` `t3` - ON `t1`.`geo_key_id` = `t3`.`geo_key_id`; + JOIN `signal_dim` `t2` USING (`signal_key_id`) + JOIN `geo_dim` `t3` USING (`geo_key_id`); CREATE OR REPLACE VIEW epimetric_latest_v AS SELECT @@ -156,10 +153,8 @@ CREATE OR REPLACE VIEW epimetric_latest_v AS `t1`.`signal_key_id` AS `signal_key_id`, `t1`.`geo_key_id` AS `geo_key_id` FROM `epimetric_latest` `t1` - JOIN `signal_dim` `t2` - ON `t1`.`signal_key_id` = `t2`.`signal_key_id` - JOIN `geo_dim` `t3` - ON `t1`.`geo_key_id` = `t3`.`geo_key_id`; + JOIN `signal_dim` `t2` USING (`signal_key_id`) + JOIN `geo_dim` `t3` USING (`geo_key_id`); CREATE TABLE `covidcast_meta_cache` ( From a61a5db5b830cc6c0c124c6ee3374264970c1976 Mon Sep 17 00:00:00 2001 From: george haff Date: Mon, 29 Aug 2022 20:16:13 -0400 Subject: [PATCH 4/6] added ANALYZE TABLE to the end of acquisition runs (and some whitespace fixes) --- src/acquisition/covidcast/csv_to_database.py | 17 ++++++++--------- src/acquisition/covidcast/database.py | 11 +++++++++++ 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 2b82fc29c..8b816fc9e 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -77,12 +77,12 @@ def upload_archive( csv_importer_impl=CsvImporter): """Upload CSVs to the database and archive them using the specified handlers. - :path_details: output from CsvImporter.find*_csv_files - + :path_details: output from CsvImporter.find*_csv_files + :database: an open connection to the epidata database :handlers: functions for archiving (successful, failed) files - + :return: the number of modified rows """ archive_as_successful, archive_as_failed = handlers @@ -130,7 +130,7 @@ def upload_archive( archive_as_successful(path_src, filename, source, logger) else: archive_as_failed(path_src, filename, source,logger) - + return total_modified_row_count @@ -149,7 +149,7 @@ def main( if not path_details: logger.info('nothing to do; exiting...') return - + logger.info("Ingesting CSVs", csv_count = len(path_details)) database = database_impl() @@ -161,13 +161,12 @@ def main( database, make_handlers(args.data_dir, args.specific_issue_date), logger) - logger.info("Finished inserting database rows", row_count = modified_row_count) - # the following print statement serves the same function as the logger.info call above - # print('inserted/updated %d rows' % modified_row_count) + logger.info("Finished inserting/updating database rows", row_count = modified_row_count) finally: + database.do_analyze() # unconditionally commit database changes since CSVs have been archived database.disconnect(True) - + logger.info( "Ingested CSVs into database", total_runtime_in_seconds=round(time.time() - start_time, 2)) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 5336b2f2d..01f52d236 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -138,6 +138,17 @@ def _reset_load_table_ai_counter(self): '1', '1', '1', '1', '1', 1, 1, 1, 1);""") self._cursor.execute(f'DELETE FROM epimetric_load') + def do_analyze(self): + """performs and stores key distribution analyses, used for join order and index selection""" + # TODO: consider expanding this to update columns' histograms + # https://dev.mysql.com/doc/refman/8.0/en/analyze-table.html#analyze-table-histogram-statistics-analysis + self._cursor.execute( + f'''ANALYZE TABLE + signal_dim, geo_dim, + {self.load_table}, {self.history_table}, {self.latest_table}''') + output = [self._cursor.column_names] + self._cursor.fetchall() + get_structured_logger('do_analyze').info("ANALYZE results: "+str(output)) + def insert_or_update_bulk(self, cc_rows): return self.insert_or_update_batch(cc_rows) From ab773f75124cb9feb06e484a567a42618b996e5b Mon Sep 17 00:00:00 2001 From: melange396 Date: Tue, 30 Aug 2022 14:49:03 -0400 Subject: [PATCH 5/6] help string clarification Co-authored-by: Katie Mazaitis --- src/acquisition/covidcast/covidcast_meta_cache_updater.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/acquisition/covidcast/covidcast_meta_cache_updater.py b/src/acquisition/covidcast/covidcast_meta_cache_updater.py index 1f98faf51..a46345b62 100644 --- a/src/acquisition/covidcast/covidcast_meta_cache_updater.py +++ b/src/acquisition/covidcast/covidcast_meta_cache_updater.py @@ -15,7 +15,7 @@ def get_argument_parser(): parser = argparse.ArgumentParser() parser.add_argument("--log_file", help="filename for log output") - parser.add_argument("--num_threads", type=int, help="number of worker threads to spawn for processing of each source/signal pair") + parser.add_argument("--num_threads", type=int, help="number of worker threads to spawn for processing source/signal pairs") return parser From 037e2118102b26ded722bebb9a3292b597f3fcd4 Mon Sep 17 00:00:00 2001 From: Katie Mazaitis Date: Tue, 30 Aug 2022 15:10:20 -0400 Subject: [PATCH 6/6] en-structure logging statement --- src/acquisition/covidcast/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 01f52d236..edfe262ae 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -147,7 +147,7 @@ def do_analyze(self): signal_dim, geo_dim, {self.load_table}, {self.history_table}, {self.latest_table}''') output = [self._cursor.column_names] + self._cursor.fetchall() - get_structured_logger('do_analyze').info("ANALYZE results: "+str(output)) + get_structured_logger('do_analyze').info("ANALYZE results", results=str(output)) def insert_or_update_bulk(self, cc_rows): return self.insert_or_update_batch(cc_rows)