Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/acquisition/covidcast/csv_to_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def upload_archive(
database.commit()
except Exception as e:
all_rows_valid = False
logger.exception('exception while inserting rows:', e)
logger.exception('exception while inserting rows', exc_info=e)
database.rollback()

# archive the current file based on validation results
Expand Down
17 changes: 11 additions & 6 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,9 @@ def count_insertstatus_rows(self):
def insert_or_update_bulk(self, cc_rows):
return self.insert_or_update_batch(cc_rows)

def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False):
def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False, suppress_jobs=False):
"""
Insert new rows into the load table.
After completing this step, run `self.run_dbjobs()` to move data into the live tables.
Insert new rows into the load table and dispatch into dimension and fact tables.
"""

# NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and
Expand All @@ -153,10 +152,14 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False
# if an entry *IS* in both load and latest tables, but latest table issue is newer, unmark is_latest_issue in load.
fix_is_latest_issue_sql = f'''
UPDATE
`{self.load_table}` JOIN `{self.latest_view}`
USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`)
`{self.load_table}`
JOIN `merged_dim` USING (`source`, `signal`, `geo_type`, `geo_value`)
JOIN `{self.latest_table}` ON
`{self.latest_table}`.`merged_key_id` = `merged_dim`.`merged_key_id` AND
`{self.latest_table}`.`time_type` = `{self.load_table}`.`time_type` AND
`{self.latest_table}`.`time_value` = `{self.load_table}`.`time_value`
SET `{self.load_table}`.`is_latest_issue`=0
WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue`
WHERE `{self.load_table}`.`issue` < `{self.latest_table}`.`issue`
'''

# TODO: consider handling cc_rows as a generator instead of a list
Expand Down Expand Up @@ -204,6 +207,8 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False
except Exception as e:
# rollback is handled in csv_to_database; if you're calling this yourself, handle your own rollback
raise e
if not suppress_jobs:
self.run_dbjobs()
return total

def run_dbjobs(self):
Expand Down
10 changes: 5 additions & 5 deletions src/ddl/v4_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ CREATE TABLE signal_history (
`missing_sample_size` INT(1) NULL DEFAULT '0',

PRIMARY KEY (`signal_data_id`) USING BTREE,
UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`issue`,`time_type`,`time_value`) USING BTREE
UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`issue`,`time_type`,`time_value`) USING BTREE,
UNIQUE INDEX `value_key_merged` (`merged_key_id`,`issue`,`time_type`,`time_value`) USING BTREE
) ENGINE=InnoDB;


CREATE TABLE signal_latest (
PRIMARY KEY (`signal_data_id`) USING BTREE,
UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`time_type`,`time_value`) USING BTREE
UNIQUE INDEX `value_key` (`signal_key_id`,`geo_key_id`,`time_type`,`time_value`) USING BTREE,
UNIQUE INDEX `value_key_merged` (`merged_key_id`,`time_type`,`time_value`) USING BTREE
) ENGINE=InnoDB SELECT * FROM signal_history;


Expand Down Expand Up @@ -109,9 +111,7 @@ CREATE TABLE signal_load (
`process_status` VARCHAR(2) DEFAULT 'l', -- using codes: 'i' (I) for "inserting", 'l' (L) for "loaded", and 'b' for "batching"
-- TODO: change `process_status` default to 'i' (I) "inserting" or even 'x'/'u' "undefined" ?

PRIMARY KEY (`signal_data_id`) USING BTREE,
INDEX `comp_signal_key` (`compressed_signal_key`) USING BTREE,
INDEX `comp_geo_key` (`compressed_geo_key`) USING BTREE
PRIMARY KEY (`signal_data_id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=4000000001;


Expand Down