|
14 | 14 |
|
15 | 15 | # first party |
16 | 16 | import delphi.operations.secrets as secrets |
17 | | -from delphi.epidata.common.logger import get_structured_logger |
| 17 | +from delphi_utils import get_structured_logger |
18 | 18 | from delphi.epidata.common.covidcast_row import CovidcastRow |
19 | 19 |
|
20 | 20 |
|
@@ -117,28 +117,28 @@ def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, |
117 | 117 | get_structured_logger("insert_or_update_batch").fatal(err_msg) |
118 | 118 | raise DBLoadStateException(err_msg) |
119 | 119 |
|
120 | | - # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and |
| 120 | + # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and |
121 | 121 | # `is_latest_issue` is hardcoded to 1 (which is temporary and addressed later in this method) |
122 | 122 | insert_into_loader_sql = f''' |
123 | 123 | INSERT INTO `{self.load_table}` |
124 | 124 | (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, |
125 | | - `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, |
| 125 | + `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, |
126 | 126 | `is_latest_issue`, `missing_value`, `missing_stderr`, `missing_sample_size`) |
127 | 127 | VALUES |
128 | | - (%s, %s, %s, %s, %s, %s, |
129 | | - UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, |
| 128 | + (%s, %s, %s, %s, %s, %s, |
| 129 | + UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, |
130 | 130 | 1, %s, %s, %s) |
131 | 131 | ''' |
132 | 132 |
|
133 | 133 | # all load table entries are already marked "is_latest_issue". |
134 | 134 | # if an entry in the load table is NOT in the latest table, it is clearly now the latest value for that key (so we do nothing (thanks to INNER join)). |
135 | 135 | # if an entry *IS* in both load and latest tables, but latest table issue is newer, unmark is_latest_issue in load. |
136 | 136 | fix_is_latest_issue_sql = f''' |
137 | | - UPDATE |
138 | | - `{self.load_table}` JOIN `{self.latest_view}` |
139 | | - USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) |
140 | | - SET `{self.load_table}`.`is_latest_issue`=0 |
141 | | - WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` |
| 137 | + UPDATE |
| 138 | + `{self.load_table}` JOIN `{self.latest_view}` |
| 139 | + USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) |
| 140 | + SET `{self.load_table}`.`is_latest_issue`=0 |
| 141 | + WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` |
142 | 142 | ''' |
143 | 143 |
|
144 | 144 | # TODO: consider handling cc_rows as a generator instead of a list |
|
0 commit comments