Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/acquisition/covidcast/covidcast_meta_cache_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,18 @@ def main(args, epidata_impl=Epidata, database_impl=Database):
if len(metadata)==0:
args = ("no results",-2)

print('covidcast_meta result: %s (code %d)' % args)
logger.info('covidcast_meta result: %s (code %d)' % args)

if args[-1] != 1:
print('unable to cache epidata')
logger.error('unable to cache epidata')
return False

# update the cache
try:
metadata_update_start_time = time.time()
database.update_covidcast_meta_cache(metadata)
metadata_update_interval_in_seconds = time.time() - metadata_update_start_time
print('successfully cached epidata')
logger.info('successfully cached epidata')
finally:
# no catch block so that an exception above will cause the program to
# fail after the following cleanup
Expand Down
27 changes: 13 additions & 14 deletions src/acquisition/covidcast/csv_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# first party
from delphi_utils import Nans
from delphi.utils.epiweek import delta_epiweeks
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger

class CsvImporter:
"""Finds and parses covidcast CSV files."""
Expand Down Expand Up @@ -84,16 +85,17 @@ def is_sane_week(value):

@staticmethod
def find_issue_specific_csv_files(scan_dir, glob=glob):
logger = get_structured_logger('find_issue_specific_csv_files')
for path in sorted(glob.glob(os.path.join(scan_dir, '*'))):
issuedir_match = CsvImporter.PATTERN_ISSUE_DIR.match(path.lower())
if issuedir_match and os.path.isdir(path):
issue_date_value = int(issuedir_match.group(2))
issue_date = CsvImporter.is_sane_day(issue_date_value)
if issue_date:
print(' processing csv files from issue date: "' + str(issue_date) + '", directory', path)
logger.info('processing csv files from issue date: "' + str(issue_date) + '", directory', path)
yield from CsvImporter.find_csv_files(path, issue=(issue_date, epi.Week.fromdate(issue_date)), glob=glob)
else:
print(' invalid issue directory day', issue_date_value)
logger.warning(event='invalid issue directory day', detail=issue_date_value, file=path)

@staticmethod
def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today())), glob=glob):
Expand All @@ -105,7 +107,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
valid, details is a tuple of (source, signal, time_type, geo_type,
time_value, issue, lag) (otherwise None).
"""

logger = get_structured_logger('find_csv_files')
issue_day,issue_epiweek=issue
issue_day_value=int(issue_day.strftime("%Y%m%d"))
issue_epiweek_value=int(str(issue_epiweek))
Expand All @@ -117,14 +119,11 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
if not path.lower().endswith('.csv'):
# safe to ignore this file
continue

print('file:', path)

# match a daily or weekly naming pattern
daily_match = CsvImporter.PATTERN_DAILY.match(path.lower())
weekly_match = CsvImporter.PATTERN_WEEKLY.match(path.lower())
if not daily_match and not weekly_match:
print(' invalid csv path/filename', path)
logger.warning(event='invalid csv path/filename', detail=path, file=path)
yield (path, None)
continue

Expand All @@ -135,7 +134,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
match = daily_match
time_value_day = CsvImporter.is_sane_day(time_value)
if not time_value_day:
print(' invalid filename day', time_value)
logger.warning(event='invalid filename day', detail=time_value, file=path)
yield (path, None)
continue
issue_value=issue_day_value
Expand All @@ -146,7 +145,7 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
match = weekly_match
time_value_week=CsvImporter.is_sane_week(time_value)
if not time_value_week:
print(' invalid filename week', time_value)
logger.warning(event='invalid filename week', detail=time_value, file=path)
yield (path, None)
continue
issue_value=issue_epiweek_value
Expand All @@ -155,15 +154,15 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
# # extract and validate geographic resolution
geo_type = match.group(3).lower()
if geo_type not in CsvImporter.GEOGRAPHIC_RESOLUTIONS:
print(' invalid geo_type', geo_type)
logger.warning(event='invalid geo_type', detail=geo_type, file=path)
yield (path, None)
continue

# extract additional values, lowercased for consistency
source = match.group(1).lower()
signal = match.group(4).lower()
if len(signal) > 64:
print(' invalid signal name (64 char limit)',signal)
logger.warning(event='invalid signal name (64 char limit)',detail=signal, file=path)
yield (path, None)
continue

Expand Down Expand Up @@ -344,19 +343,19 @@ def load_csv(filepath, geo_type, pandas=pandas):
In case of a validation error, `None` is yielded for the offending row,
including the header.
"""

logger = get_structured_logger('load_csv')
# don't use type inference, just get strings
table = pandas.read_csv(filepath, dtype='str')

if not CsvImporter.is_header_valid(table.columns):
print(' invalid header')
logger.warning(event='invalid header', detail=table.columns, file=filepath)
yield None
return

for row in table.itertuples(index=False):
row_values, error = CsvImporter.extract_and_check_row(row, geo_type)
if error:
print(' invalid value for %s (%s)' % (str(row), error))
logger.warning(event = 'invalid value for row', detail=(str(row), error), file=filepath)
yield None
continue
yield row_values
42 changes: 21 additions & 21 deletions src/acquisition/covidcast/csv_to_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,25 @@ def get_argument_parser():
help="filename for log output (defaults to stdout)")
return parser

def collect_files(data_dir, specific_issue_date, csv_importer_impl=CsvImporter):
def collect_files(data_dir, specific_issue_date,csv_importer_impl=CsvImporter):
"""Fetch path and data profile details for each file to upload."""

logger= get_structured_logger('collect_files')
if specific_issue_date:
results = list(csv_importer_impl.find_issue_specific_csv_files(data_dir))
else:
results = list(csv_importer_impl.find_csv_files(os.path.join(data_dir, 'receiving')))
print(f'found {len(results)} files')
logger.info(f'found {len(results)} files')
return results

def make_handlers(data_dir, specific_issue_date, file_archiver_impl=FileArchiver):
if specific_issue_date:
# issue-specific uploads are always one-offs, so we can leave all
# files in place without worrying about cleaning up
def handle_failed(path_src, filename, source):
print(f'leaving failed file alone - {source}')
def handle_failed(path_src, filename, source, logger):
logger.info(event='leaving failed file alone', dest=source, file=filename)

def handle_successful(path_src, filename, source):
print('archiving as successful')
def handle_successful(path_src, filename, source, logger):
logger.info(event='archiving as successful',file=filename)
file_archiver_impl.archive_inplace(path_src, filename)
else:
# normal automation runs require some shuffling to remove files
Expand All @@ -64,15 +64,15 @@ def handle_successful(path_src, filename, source):
archive_failed_dir = os.path.join(data_dir, 'archive', 'failed')

# helper to archive a failed file without compression
def handle_failed(path_src, filename, source):
print('archiving as failed - '+source)
def handle_failed(path_src, filename, source, logger):
logger.info(event='archiving as failed - ', detail=source, file=filename)
path_dst = os.path.join(archive_failed_dir, source)
compress = False
file_archiver_impl.archive_file(path_src, path_dst, filename, compress)

# helper to archive a successful file with compression
def handle_successful(path_src, filename, source):
print('archiving as successful')
def handle_successful(path_src, filename, source, logger):
logger.info(event='archiving as successful',file=filename)
path_dst = os.path.join(archive_successful_dir, source)
compress = True
file_archiver_impl.archive_file(path_src, path_dst, filename, compress)
Expand Down Expand Up @@ -101,15 +101,14 @@ def upload_archive(
"""
archive_as_successful, archive_as_failed = handlers
total_modified_row_count = 0

# iterate over each file
for path, details in path_details:
print('handling ', path)
logger.info(event='handling',dest=path)
path_src, filename = os.path.split(path)

if not details:
# file path or name was invalid, source is unknown
archive_as_failed(path_src, filename, 'unknown')
archive_as_failed(path_src, filename, 'unknown',logger)
continue

(source, signal, time_type, geo_type, time_value, issue, lag) = details
Expand All @@ -130,7 +129,7 @@ def upload_archive(
if all_rows_valid:
try:
modified_row_count = database.insert_or_update_bulk(rows_list)
print(f"insert_or_update_bulk {filename} returned {modified_row_count}")
logger.info(f"insert_or_update_bulk {filename} returned {modified_row_count}")
logger.info(
"Inserted database rows",
row_count = modified_row_count,
Expand All @@ -145,14 +144,14 @@ def upload_archive(
database.commit()
except Exception as e:
all_rows_valid = False
print('exception while inserting rows:', e)
logger.exception('exception while inserting rows:', e)
database.rollback()

# archive the current file based on validation results
if all_rows_valid:
archive_as_successful(path_src, filename, source)
archive_as_successful(path_src, filename, source, logger)
else:
archive_as_failed(path_src, filename, source)
archive_as_failed(path_src, filename, source,logger)

return total_modified_row_count

Expand All @@ -168,7 +167,7 @@ def main(
start_time = time.time()

if args.is_wip_override and args.not_wip_override:
print('conflicting overrides for forcing WIP option! exiting...')
logger.error('conflicting overrides for forcing WIP option! exiting...')
return
wip_override = None
if args.is_wip_override:
Expand All @@ -179,7 +178,7 @@ def main(
# shortcut escape without hitting db if nothing to do
path_details = collect_files_impl(args.data_dir, args.specific_issue_date)
if not path_details:
print('nothing to do; exiting...')
logger.info('nothing to do; exiting...')
return

logger.info("Ingesting CSVs", csv_count = len(path_details))
Expand All @@ -195,7 +194,8 @@ def main(
logger,
is_wip_override=wip_override)
logger.info("Finished inserting database rows", row_count = modified_row_count)
print('inserted/updated %d rows' % modified_row_count)
# the following print statement serves the same function as the logger.info call above
# print('inserted/updated %d rows' % modified_row_count)
finally:
# unconditionally commit database changes since CSVs have been archived
database.disconnect(True)
Expand Down
11 changes: 6 additions & 5 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# first party
import delphi.operations.secrets as secrets

from delphi.epidata.acquisition.covidcast.logger import get_structured_logger

class CovidcastRow():
"""A container for all the values of a single covidcast row."""
Expand Down Expand Up @@ -248,7 +249,7 @@ def insert_or_update_batch(self, cc_rows, batch_size=2**20, commit_partial=False

def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
"""Compute and return metadata on all non-WIP COVIDcast signals."""

logger = get_structured_logger("compute_covidcast_meta")
index_hint = ""
if use_index:
index_hint = "USE INDEX (for_metadata)"
Expand Down Expand Up @@ -304,7 +305,7 @@ def compute_covidcast_meta(self, table_name='covidcast', use_index=True):
meta_lock = threading.Lock()

def worker():
print("starting thread: " + threading.current_thread().name)
logger.info("starting thread: " + threading.current_thread().name)
# set up new db connection for thread
worker_dbc = Database()
worker_dbc.connect(connector_impl=self._connector_impl)
Expand All @@ -319,7 +320,7 @@ def worker():
))
srcsigs.task_done()
except Empty:
print("no jobs left, thread terminating: " + threading.current_thread().name)
logger.info("no jobs left, thread terminating: " + threading.current_thread().name)
finally:
worker_dbc.disconnect(False) # cleanup

Expand All @@ -330,10 +331,10 @@ def worker():
threads.append(t)

srcsigs.join()
print("jobs complete")
logger.info("jobs complete")
for t in threads:
t.join()
print("threads terminated")
logger.error("threads terminated")

# sort the metadata because threaded workers dgaf
sorting_fields = "data_source signal time_type geo_type".split()
Expand Down
5 changes: 4 additions & 1 deletion src/acquisition/covidcast/file_archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import os
import shutil

# first party
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger

class FileArchiver:
"""Archives files by moving and compressing."""
Expand Down Expand Up @@ -40,6 +42,7 @@ def archive_file(
file already exists, it will be overwritten.
"""

logger = get_structured_logger("file_archiver")
src = os.path.join(path_src, filename)
dst = os.path.join(path_dst, filename)

Expand All @@ -51,7 +54,7 @@ def archive_file(

if os.path.exists(dst):
# warn that destination is about to be overwritten
print('destination exists, will overwrite (%s)' % dst)
logger.warning(event='destination exists, will overwrite', file=dst)

if compress:
# make a compressed copy
Expand Down
7 changes: 5 additions & 2 deletions src/acquisition/covidcast/fill_is_latest_issue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# first party
import delphi.operations.secrets as secrets

from delphi.epidata.acquisition.covidcast.logger import get_structured_logger

# partition configuration
###PARTITION_VARIABLE = 'geo_value'
Expand All @@ -34,6 +34,9 @@

def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITION=_FILTER_CONDITION):


logger = get_structured_logger("fill_is_lastest_issue")

u, p = secrets.db.epi
connection = mysql.connector.connect(
host=secrets.db.host,
Expand Down Expand Up @@ -94,7 +97,7 @@ def main(*, CLEAR_LATEST_BY_PARTITION=_CLEAR_LATEST_BY_PARTITION, FILTER_CONDITI
commit = True
except Exception as e:
connection.rollback()
print("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE))
logger.exception("exception raised at partition %s (partition index #%s) of column `%s`" % (PARTITION_SPLITS[partition_index], partition_index, PARTITION_VARIABLE))
raise e
finally:
cursor.close()
Expand Down
5 changes: 1 addition & 4 deletions src/acquisition/covidcast/generate_islatest_fix_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
for typ in ('num', 'prop'):
signals.append(case+period+count+typ)
### signals = ['sig2'] ###


# variable to split on, 'time_value' is good because its high cardinality is suitable for chunking
PARTITION_VARIABLE = 'time_value'
PARTITION_SPLITS = [20200101 + i*100 for i in range(10)] # first day of the month for jan - oct 2020 in YYYYMMDD form
Expand All @@ -40,7 +38,6 @@
ge_condition = 'TRUE' if partition_index == 0 else f'`{PARTITION_VARIABLE}` >= {PARTITION_SPLITS[partition_index - 1]}'
l_condition = 'TRUE' if partition_index == len(PARTITION_SPLITS) else f'`{PARTITION_VARIABLE}` < {PARTITION_SPLITS[partition_index]}'
partition_condition = f'({ge_condition}) AND ({l_condition})'

for sig in signals:
where_clause = base_where_clause + " AND `signal`='%s' AND %s" % (sig, partition_condition)

Expand All @@ -63,4 +60,4 @@

# clean up temp table
print("-- TODO: drop this table")
print("-- DROP TABLE `islatest_fix`;")
print("-- DROP TABLE `islatest_fix`;")