From 0e89cabce4d6a4bf5f2a0ecd04e7639b018a1e62 Mon Sep 17 00:00:00 2001 From: Kathryn M Mazaitis Date: Tue, 28 Jun 2022 10:49:43 -0400 Subject: [PATCH 1/2] Repair runtime problems with mergeddim acquisition. * Switch to two quick joins when setting is_latest instead of one slow join * add merged_key_id indexes * fix bad syntax when logging exceptions * automatically call dbjobs after insert_batch --- src/acquisition/covidcast/csv_to_database.py | 2 +- src/acquisition/covidcast/database.py | 15 ++++++++++----- src/ddl/v4_schema.sql | 10 +++++----- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index 9828564d0..2b82fc29c 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -122,7 +122,7 @@ def upload_archive( database.commit() except Exception as e: all_rows_valid = False - logger.exception('exception while inserting rows:', e) + logger.exception('exception while inserting rows', exc_info=e) database.rollback() # archive the current file based on validation results diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 7bf57e9f6..08a9fbd20 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -128,10 +128,9 @@ def count_insertstatus_rows(self): def insert_or_update_bulk(self, cc_rows): return self.insert_or_update_batch(cc_rows) - def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False): + def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False, suppress_jobs=False): """ - Insert new rows into the load table. - After completing this step, run `self.run_dbjobs()` to move data into the live tables. + Insert new rows into the load table and dispatch into dimension and fact tables. """ # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and @@ -153,8 +152,12 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False # if an entry *IS* in both load and latest tables, but latest table issue is newer, unmark is_latest_issue in load. fix_is_latest_issue_sql = f''' UPDATE - `{self.load_table}` JOIN `{self.latest_view}` - USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) + `{self.load_table}` + JOIN `merged_dim` USING (`source`, `signal`, `geo_type`, `geo_value`) + JOIN `{self.latest_view}` ON + `{self.latest_view}`.`merged_key_id` = `merged_dim`.`merged_key_id` AND + `{self.latest_view}`.`time_type` = `{self.load_table}`.`time_type` AND + `{self.latest_view}`.`time_value` = `{self.load_table}`.`time_value` SET `{self.load_table}`.`is_latest_issue`=0 WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` ''' @@ -204,6 +207,8 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False except Exception as e: # rollback is handled in csv_to_database; if you're calling this yourself, handle your own rollback raise e + if not suppress_jobs: + self.run_dbjobs() return total def run_dbjobs(self): diff --git a/src/ddl/v4_schema.sql b/src/ddl/v4_schema.sql index d51319968..bca133df5 100644 --- a/src/ddl/v4_schema.sql +++ b/src/ddl/v4_schema.sql @@ -61,13 +61,15 @@ CREATE TABLE signal_history ( `missing_sample_size` INT(1) NULL DEFAULT '0', PRIMARY KEY (`signal_data_id`) USING BTREE, - UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`issue`,`time_type`,`time_value`) USING BTREE + UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`issue`,`time_type`,`time_value`) USING BTREE, + UNIQUE INDEX `value_key_merged` (`merged_key_id`,`issue`,`time_type`,`time_value`) USING BTREE ) ENGINE=InnoDB; CREATE TABLE signal_latest ( PRIMARY KEY (`signal_data_id`) USING BTREE, - UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`time_type`,`time_value`) USING BTREE + UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`time_type`,`time_value`) USING BTREE, + UNIQUE INDEX `value_key_merged` (`merged_key_id`,`time_type`,`time_value`) USING BTREE ) ENGINE=InnoDB SELECT * FROM signal_history; @@ -109,9 +111,7 @@ CREATE TABLE signal_load ( `process_status` VARCHAR(2) DEFAULT 'l', -- using codes: 'i' (I) for "inserting", 'l' (L) for "loaded", and 'b' for "batching" -- TODO: change `process_status` default to 'i' (I) "inserting" or even 'x'/'u' "undefined" ? - PRIMARY KEY (`signal_data_id`) USING BTREE, - INDEX `comp_signal_key` (`compressed_signal_key`) USING BTREE, - INDEX `comp_geo_key` (`compressed_geo_key`) USING BTREE + PRIMARY KEY (`signal_data_id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=4000000001; From 57a7a94b4c72fa54cee40e8a0ff27dc0b5a49177 Mon Sep 17 00:00:00 2001 From: Kathryn M Mazaitis Date: Tue, 28 Jun 2022 11:04:41 -0400 Subject: [PATCH 2/2] Better repair. Knocks out an additional unnecessary step in the query plan; we don't need the view to join merged_dim for us because we already did that --- src/acquisition/covidcast/database.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 08a9fbd20..ce7b73176 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -154,12 +154,12 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False UPDATE `{self.load_table}` JOIN `merged_dim` USING (`source`, `signal`, `geo_type`, `geo_value`) - JOIN `{self.latest_view}` ON - `{self.latest_view}`.`merged_key_id` = `merged_dim`.`merged_key_id` AND - `{self.latest_view}`.`time_type` = `{self.load_table}`.`time_type` AND - `{self.latest_view}`.`time_value` = `{self.load_table}`.`time_value` + JOIN `{self.latest_table}` ON + `{self.latest_table}`.`merged_key_id` = `merged_dim`.`merged_key_id` AND + `{self.latest_table}`.`time_type` = `{self.load_table}`.`time_type` AND + `{self.latest_table}`.`time_value` = `{self.load_table}`.`time_value` SET `{self.load_table}`.`is_latest_issue`=0 - WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` + WHERE `{self.load_table}`.`issue` < `{self.latest_table}`.`issue` ''' # TODO: consider handling cc_rows as a generator instead of a list