Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions src/acquisition/covidcast/covidcast_meta_cache_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,38 @@
# standard library
import argparse
import sys
import time

# first party
from delphi.epidata.acquisition.covidcast.database import Database
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
from delphi.epidata.client.delphi_epidata import Epidata


def get_argument_parser():
"""Define command line arguments."""

# there are no flags, but --help will still work
return argparse.ArgumentParser()
parser = argparse.ArgumentParser()
parser.add_argument("--log_file", help="filename for log output")
return parser


def main(args, epidata_impl=Epidata, database_impl=Database):
"""Update the covidcast metadata cache.

`args`: parsed command-line arguments
"""
logger = get_structured_logger(
"metadata_cache_updater",
filename=args.log_file)
start_time = time.time()
database = database_impl()
database.connect()

# fetch metadata
try:
metadata_calculation_start_time = time.time()
metadata = database.get_covidcast_meta()
metadata_calculation_interval_in_seconds = metadata_calculation_start_time - start_time
except:
# clean up before failing
database.disconnect(True)
Expand All @@ -44,13 +52,22 @@ def main(args, epidata_impl=Epidata, database_impl=Database):

# update the cache
try:
metadata_update_start_time = time.time()
database.update_covidcast_meta_cache(metadata)
metadata_update_interval_in_seconds = time.time() - metadata_update_start_time
print('successfully cached epidata')
finally:
# no catch block so that an exception above will cause the program to
# fail after the following cleanup
database.disconnect(True)

logger.info(
"Generated and updated covidcast metadata",
metadata_calculation_interval_in_seconds=round(
metadata_calculation_interval_in_seconds, 2),
metadata_update_interval_in_seconds=round(
metadata_update_interval_in_seconds, 2),
total_runtime_in_seconds=round(time.time() - start_time, 2))
return True


Expand Down
92 changes: 92 additions & 0 deletions src/acquisition/covidcast/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Structured logger utility for creating JSON logs in Delphi pipelines."""
import logging
import sys
import threading
import structlog


def handle_exceptions(logger):
"""Handle exceptions using the provided logger."""
def exception_handler(etype, value, traceback):
logger.exception("Top-level exception occurred",
exc_info=(etype, value, traceback))

def multithread_exception_handler(args):
exception_handler(args.exc_type, args.exc_value, args.exc_traceback)

sys.excepthook = exception_handler
threading.excepthook = multithread_exception_handler


def get_structured_logger(name=__name__,
filename=None,
log_exceptions=True):
"""Create a new structlog logger.

Use the logger returned from this in indicator code using the standard
wrapper calls, e.g.:

logger = get_structured_logger(__name__)
logger.warning("Error", type="Signal too low").

The output will be rendered as JSON which can easily be consumed by logs
processors.

See the structlog documentation for details.

Parameters
---------
name: Name to use for logger (included in log lines), __name__ from caller
is a good choice.
filename: An (optional) file to write log output.
"""
# Configure the underlying logging configuration
handlers = [logging.StreamHandler()]
if filename:
handlers.append(logging.FileHandler(filename))

logging.basicConfig(
format="%(message)s",
level=logging.INFO,
handlers=handlers
)

# Configure structlog. This uses many of the standard suggestions from
# the structlog documentation.
structlog.configure(
processors=[
# Filter out log levels we are not tracking.
structlog.stdlib.filter_by_level,
# Include logger name in output.
structlog.stdlib.add_logger_name,
# Include log level in output.
structlog.stdlib.add_log_level,
# Allow formatting into arguments e.g., logger.info("Hello, %s",
# name)
structlog.stdlib.PositionalArgumentsFormatter(),
# Add timestamps.
structlog.processors.TimeStamper(fmt="iso"),
# Match support for exception logging in the standard logger.
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
# Decode unicode characters
structlog.processors.UnicodeDecoder(),
# Render as JSON
structlog.processors.JSONRenderer()
],
# Use a dict class for keeping track of data.
context_class=dict,
# Use a standard logger for the actual log call.
logger_factory=structlog.stdlib.LoggerFactory(),
# Use a standard wrapper class for utilities like log.warning()
wrapper_class=structlog.stdlib.BoundLogger,
# Cache the logger
cache_logger_on_first_use=True,
)

logger = structlog.get_logger(name)

if log_exceptions:
handle_exceptions(logger)

return logger
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_main_successful(self):
'epidata': [{'foo': 'bar'}],
}

args = None
args = MagicMock(log_file="log")
mock_epidata_impl = MagicMock()
mock_epidata_impl.covidcast_meta.return_value = api_response
mock_database = MagicMock()
Expand Down Expand Up @@ -64,7 +64,7 @@ def test_main_failure(self):
'message': 'no',
}

args = None
args = MagicMock(log_file="log")
mock_database = MagicMock()
mock_database.get_covidcast_meta.return_value = list()
fake_database_impl = lambda: mock_database
Expand Down