From d9ca20f00e58ab63371bc2b693507f2adc9a46fe Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 8 Aug 2023 13:52:13 -0400 Subject: [PATCH 1/4] logger.py updates --- src/common/logger.py | 135 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 119 insertions(+), 16 deletions(-) diff --git a/src/common/logger.py b/src/common/logger.py index 909e47fb7..c97f17123 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -1,24 +1,43 @@ +"""Structured logger utility for creating JSON logs.""" + +# the Delphi group uses two ~identical versions of this file. +# try to keep them in sync with edits, for sanity. +# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py +# https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py + import logging import os +import structlog import sys import threading -import structlog +import traceback def handle_exceptions(logger): """Handle exceptions using the provided logger.""" - def exception_handler(etype, value, traceback): - logger.exception("Top-level exception occurred", exc_info=(etype, value, traceback)) + def exception_handler(scope, etype, value, traceback): + logger.exception("Top-level exception occurred", scope=scope, exc_info=(etype, value, traceback)) + + def sys_exception_handler(etype, value, traceback): + exception_handler("sys", etype, value, traceback) - def multithread_exception_handler(args): - exception_handler(args.exc_type, args.exc_value, args.exc_traceback) + def threading_exception_handler(args): + if args.exc_type == SystemExit and args.exc_value.code == 0: + # `sys.exit(0)` is considered "successful termination": + # https://docs.python.org/3/library/sys.html#sys.exit + logger.debug(f"normal thread exit", thread=args.thread, + stack="".join(traceback.format_exception(args.exc_type, args.exc_value, args.exc_traceback))) + else: + exception_handler(f"thread: {args.thread}", args.exc_type, args.exc_value, args.exc_traceback) - sys.excepthook = exception_handler - threading.excepthook = multithread_exception_handler + sys.excepthook = sys_exception_handler + threading.excepthook = threading_exception_handler -def get_structured_logger(name=__name__, filename=None, log_exceptions=True): +def get_structured_logger(name=__name__, + filename=None, + log_exceptions=True): """Create a new structlog logger. Use the logger returned from this in indicator code using the standard @@ -38,19 +57,18 @@ def get_structured_logger(name=__name__, filename=None, log_exceptions=True): is a good choice. filename: An (optional) file to write log output. """ - # Configure the underlying logging configuration - handlers = [logging.StreamHandler()] - if filename: - handlers.append(logging.FileHandler(filename)) - + # Set the underlying logging configuration if "LOG_DEBUG" in os.environ: log_level = logging.DEBUG else: log_level = logging.INFO - logging.basicConfig(format="%(message)s", level=log_level, handlers=handlers) + logging.basicConfig( + format="%(message)s", + level=log_level, + handlers=[logging.StreamHandler()]) - def add_pid(logger, method_name, event_dict): + def add_pid(_logger, _method_name, event_dict): """ Add current PID to the event dict. """ @@ -92,9 +110,94 @@ def add_pid(logger, method_name, event_dict): cache_logger_on_first_use=True, ) - logger = structlog.get_logger(name) + # Create the underlying python logger and wrap it with structlog + system_logger = logging.getLogger(name) + if filename and not system_logger.handlers: + system_logger.addHandler(logging.FileHandler(filename)) + system_logger.setLevel(log_level) + logger = structlog.wrap_logger(system_logger) if log_exceptions: handle_exceptions(logger) return logger + + + + +# the above loggers are thread-safe but not multiprocessing-safe. a `LoggerThread` will spawn a thread that listens to a mp.Queue and logs messages from it with the provided logger, so other processes can send logging messages to it via the logger-like `SubLogger` interface. the SubLogger even logs the pid of the caller. this is good to use with a set of jobs that are part of a mp.Pool, but isnt recommended for general use because of overhead from threading and multiprocessing, and because it might introduce lag to log messages. + +import multiprocessing +import contextlib + +class LoggerThread(): + # TODO: add checks to prevent use of a stopped thread? + # TODO: reduce level of a bunch of debugging logs (search "self.logger.info") + + class SubLogger(): + def __init__(self, queue): + self.queue = queue + def _log(self, level, *args, **kwargs): + kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} + kwargs_plus.update(kwargs) + self.queue.put([level, args, kwargs_plus]) + # TODO: add log levels beyond `info` + def info(self, *args, **kwargs): + self._log(logging.INFO, *args, **kwargs) + + + def get_sublogger(self): + return self.sublogger + + def __init__(self, logger, q=None): + self.logger = logger + if q: + self.msg_queue = q + else: + self.msg_queue = multiprocessing.Queue() + + def logger_thread_worker(): + self.logger.info('thread started') + while True: + msg = self.msg_queue.get() + if msg == 'STOP': + self.logger.info('received stop signal') + break + level, args, kwargs = msg + # TODO: add log levels beyond `info` + if level == logging.INFO: + self.logger.info(*args, **kwargs) + else: + self.logger.error('received unknown logging level! exiting...') + break + self.logger.info('stopping thread') + + self.thread = threading.Thread(target=logger_thread_worker, name="LoggerThread__"+self.logger.name) + self.logger.info('starting thread') + self.thread.start() + + self.sublogger = LoggerThread.SubLogger(self.msg_queue) + + def stop(self): + # TODO: make this safely re-callable? + self.logger.info('sending stop signal') + self.msg_queue.put('STOP') + self.thread.join() + self.logger.info('thread stopped') + + + +@contextlib.contextmanager +def pool_and_threadedlogger(logger, *poolargs): + """ + emulates the multiprocessing.Pool() context manager, but also provides a logger that can be used by pool workers. + """ + with multiprocessing.Manager() as manager: + logger_thread = LoggerThread(logger, manager.Queue()) + try: + with multiprocessing.Pool(*poolargs) as pool: + yield pool, logger_thread.get_sublogger() + pool.close() + pool.join() + finally: + logger_thread.stop() From 1ecc8c78dc435b1a408b0d2b7c2ec931f71c7f8e Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 8 Aug 2023 18:16:45 -0400 Subject: [PATCH 2/4] updates from other PR --- src/common/logger.py | 183 +++++++++++++++++++++++++------------------ 1 file changed, 107 insertions(+), 76 deletions(-) diff --git a/src/common/logger.py b/src/common/logger.py index c97f17123..349f2c883 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -2,22 +2,26 @@ # the Delphi group uses two ~identical versions of this file. # try to keep them in sync with edits, for sanity. -# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py +# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # pylint: disable=line-too-long # https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py +import contextlib import logging +import multiprocessing import os -import structlog import sys import threading -import traceback +from traceback import format_exception + +import structlog def handle_exceptions(logger): """Handle exceptions using the provided logger.""" def exception_handler(scope, etype, value, traceback): - logger.exception("Top-level exception occurred", scope=scope, exc_info=(etype, value, traceback)) + logger.exception("Top-level exception occurred", + scope=scope, exc_info=(etype, value, traceback)) def sys_exception_handler(etype, value, traceback): exception_handler("sys", etype, value, traceback) @@ -26,10 +30,13 @@ def threading_exception_handler(args): if args.exc_type == SystemExit and args.exc_value.code == 0: # `sys.exit(0)` is considered "successful termination": # https://docs.python.org/3/library/sys.html#sys.exit - logger.debug(f"normal thread exit", thread=args.thread, - stack="".join(traceback.format_exception(args.exc_type, args.exc_value, args.exc_traceback))) + logger.debug("normal thread exit", thread=args.thread, + stack="".join( + format_exception( + args.exc_type, args.exc_value, args.exc_traceback))) else: - exception_handler(f"thread: {args.thread}", args.exc_type, args.exc_value, args.exc_traceback) + exception_handler(f"thread: {args.thread}", + args.exc_type, args.exc_value, args.exc_traceback) sys.excepthook = sys_exception_handler threading.excepthook = threading_exception_handler @@ -69,9 +76,7 @@ def get_structured_logger(name=__name__, handlers=[logging.StreamHandler()]) def add_pid(_logger, _method_name, event_dict): - """ - Add current PID to the event dict. - """ + """Add current PID to the event dict.""" event_dict["pid"] = os.getpid() return event_dict @@ -125,79 +130,105 @@ def add_pid(_logger, _method_name, event_dict): -# the above loggers are thread-safe but not multiprocessing-safe. a `LoggerThread` will spawn a thread that listens to a mp.Queue and logs messages from it with the provided logger, so other processes can send logging messages to it via the logger-like `SubLogger` interface. the SubLogger even logs the pid of the caller. this is good to use with a set of jobs that are part of a mp.Pool, but isnt recommended for general use because of overhead from threading and multiprocessing, and because it might introduce lag to log messages. +class LoggerThread(): + """ + the bare structlog loggers are thread-safe but not multiprocessing-safe. + a `LoggerThread` will spawn a thread that listens to a mp.Queue + and logs messages from it with the provided logger, + so other processes can send logging messages to it + via the logger-like `SubLogger` interface. + the SubLogger even logs the pid of the caller. + + this is good to use with a set of jobs that are part of a mp.Pool, + but isnt recommended for general use + because of overhead from threading and multiprocessing, + and because it might introduce lag to log messages. + """ -import multiprocessing -import contextlib + class SubLogger(): + """ + Multiprocessing-safe logger-like interface + to convey log messages to a listening LoggerThread. + """ -class LoggerThread(): - # TODO: add checks to prevent use of a stopped thread? - # TODO: reduce level of a bunch of debugging logs (search "self.logger.info") - - class SubLogger(): - def __init__(self, queue): - self.queue = queue - def _log(self, level, *args, **kwargs): - kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} - kwargs_plus.update(kwargs) - self.queue.put([level, args, kwargs_plus]) - # TODO: add log levels beyond `info` - def info(self, *args, **kwargs): - self._log(logging.INFO, *args, **kwargs) - - - def get_sublogger(self): - return self.sublogger - - def __init__(self, logger, q=None): - self.logger = logger - if q: - self.msg_queue = q - else: - self.msg_queue = multiprocessing.Queue() - - def logger_thread_worker(): - self.logger.info('thread started') - while True: - msg = self.msg_queue.get() - if msg == 'STOP': - self.logger.info('received stop signal') - break - level, args, kwargs = msg - # TODO: add log levels beyond `info` - if level == logging.INFO: - self.logger.info(*args, **kwargs) - else: - self.logger.error('received unknown logging level! exiting...') - break - self.logger.info('stopping thread') + def __init__(self, queue): + self.queue = queue - self.thread = threading.Thread(target=logger_thread_worker, name="LoggerThread__"+self.logger.name) - self.logger.info('starting thread') - self.thread.start() + def _log(self, level, *args, **kwargs): + kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} + kwargs_plus.update(kwargs) + self.queue.put([level, args, kwargs_plus]) - self.sublogger = LoggerThread.SubLogger(self.msg_queue) + def info(self, *args, **kwargs): + """Log an INFO level message.""" + self._log(logging.INFO, *args, **kwargs) - def stop(self): - # TODO: make this safely re-callable? - self.logger.info('sending stop signal') - self.msg_queue.put('STOP') - self.thread.join() - self.logger.info('thread stopped') + def warn(self, *args, **kwargs): + """Log a WARN level message.""" + self._log(logging.WARN, *args, **kwargs) + + + def get_sublogger(self): + """Accessor method to retrieve a SubLogger for this LoggerThread.""" + return self.sublogger + + def __init__(self, logger, q=None): + self.logger = logger + if q: + self.msg_queue = q + else: + self.msg_queue = multiprocessing.Queue() + + def logger_thread_worker(): + logger.info('thread started') + while True: + msg = self.msg_queue.get() + if msg == 'STOP': + logger.debug('received stop signal') + break + level, args, kwargs = msg + if level == logging.INFO: + logger.info(*args, **kwargs) + if level == logging.WARN: + logger.warn(*args, **kwargs) + else: + logger.error('received unknown logging level! exiting...') + break + logger.debug('stopping thread') + + self.thread = threading.Thread(target=logger_thread_worker, + name="LoggerThread__"+logger.name) + logger.debug('starting thread') + self.thread.start() + + self.sublogger = LoggerThread.SubLogger(self.msg_queue) + self.running = True + + def stop(self): + """Terminate this LoggerThread.""" + if not self.running: + self.logger.warn('thread already stopped') + return + self.logger.debug('sending stop signal') + self.msg_queue.put('STOP') + self.thread.join() + self.running = False + self.logger.info('thread stopped') @contextlib.contextmanager def pool_and_threadedlogger(logger, *poolargs): - """ - emulates the multiprocessing.Pool() context manager, but also provides a logger that can be used by pool workers. - """ - with multiprocessing.Manager() as manager: - logger_thread = LoggerThread(logger, manager.Queue()) - try: - with multiprocessing.Pool(*poolargs) as pool: - yield pool, logger_thread.get_sublogger() - pool.close() - pool.join() - finally: - logger_thread.stop() + """ + Emulates the multiprocessing.Pool() context manager, + but also provides a logger that can be used by pool workers. + """ + with multiprocessing.Manager() as manager: + logger_thread = LoggerThread(logger, manager.Queue()) + try: + with multiprocessing.Pool(*poolargs) as pool: + yield pool, logger_thread.get_sublogger() + pool.close() + pool.join() + finally: + logger_thread.stop() From bf43f78b6cd3077365d03260cdff2976d9609aee Mon Sep 17 00:00:00 2001 From: george haff Date: Fri, 11 Aug 2023 14:59:47 -0400 Subject: [PATCH 3/4] changes from other PR --- src/common/logger.py | 56 ++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/src/common/logger.py b/src/common/logger.py index 349f2c883..889b95502 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -128,10 +128,10 @@ def add_pid(_logger, _method_name, event_dict): return logger - - class LoggerThread(): """ + A construct to use a logger from multiprocessing workers/jobs. + the bare structlog loggers are thread-safe but not multiprocessing-safe. a `LoggerThread` will spawn a thread that listens to a mp.Queue and logs messages from it with the provided logger, @@ -143,15 +143,16 @@ class LoggerThread(): but isnt recommended for general use because of overhead from threading and multiprocessing, and because it might introduce lag to log messages. + + somewhat inspired by: + docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes """ class SubLogger(): - """ - Multiprocessing-safe logger-like interface - to convey log messages to a listening LoggerThread. - """ + """MP-safe logger-like interface to convey log messages to a listening LoggerThread.""" def __init__(self, queue): + """Create SubLogger with a bound queue.""" self.queue = queue def _log(self, level, *args, **kwargs): @@ -159,20 +160,35 @@ def _log(self, level, *args, **kwargs): kwargs_plus.update(kwargs) self.queue.put([level, args, kwargs_plus]) + def debug(self, *args, **kwargs): + """Log a DEBUG level message.""" + self._log(logging.DEBUG, *args, **kwargs) + def info(self, *args, **kwargs): """Log an INFO level message.""" self._log(logging.INFO, *args, **kwargs) - def warn(self, *args, **kwargs): - """Log a WARN level message.""" - self._log(logging.WARN, *args, **kwargs) + def warning(self, *args, **kwargs): + """Log a WARNING level message.""" + self._log(logging.WARNING, *args, **kwargs) + + def error(self, *args, **kwargs): + """Log an ERROR level message.""" + self._log(logging.ERROR, *args, **kwargs) + + def critical(self, *args, **kwargs): + """Log a CRITICAL level message.""" + self._log(logging.CRITICAL, *args, **kwargs) + + def get_sublogger(self): - """Accessor method to retrieve a SubLogger for this LoggerThread.""" + """Retrieve SubLogger for this LoggerThread.""" return self.sublogger def __init__(self, logger, q=None): + """Create and start LoggerThread with supplied logger, creating a queue if not provided.""" self.logger = logger if q: self.msg_queue = q @@ -187,12 +203,12 @@ def logger_thread_worker(): logger.debug('received stop signal') break level, args, kwargs = msg - if level == logging.INFO: - logger.info(*args, **kwargs) - if level == logging.WARN: - logger.warn(*args, **kwargs) + if level in [logging.DEBUG, logging.INFO, logging.WARNING, + logging.ERROR, logging.CRITICAL]: + logger.log(level, *args, **kwargs) else: - logger.error('received unknown logging level! exiting...') + logger.error('received unknown logging level! exiting...', + level=level, args_kwargs=(args, kwargs)) break logger.debug('stopping thread') @@ -207,7 +223,7 @@ def logger_thread_worker(): def stop(self): """Terminate this LoggerThread.""" if not self.running: - self.logger.warn('thread already stopped') + self.logger.warning('thread already stopped') return self.logger.debug('sending stop signal') self.msg_queue.put('STOP') @@ -216,12 +232,16 @@ def stop(self): self.logger.info('thread stopped') - @contextlib.contextmanager def pool_and_threadedlogger(logger, *poolargs): """ + Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger. + Emulates the multiprocessing.Pool() context manager, - but also provides a logger that can be used by pool workers. + but also provides (via a LoggerThread) a SubLogger proxy to logger + that can be safely used by pool workers. + Also "cleans up" the pool by waiting for workers to complete + as it exits the context. """ with multiprocessing.Manager() as manager: logger_thread = LoggerThread(logger, manager.Queue()) From e8d0bb13bef8bdb814cbdec002d1f619fa523c8c Mon Sep 17 00:00:00 2001 From: george haff Date: Mon, 14 Aug 2023 10:18:13 -0400 Subject: [PATCH 4/4] more changes from other PR --- src/common/logger.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/common/logger.py b/src/common/logger.py index 889b95502..d04ff7673 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -181,8 +181,6 @@ def critical(self, *args, **kwargs): self._log(logging.CRITICAL, *args, **kwargs) - - def get_sublogger(self): """Retrieve SubLogger for this LoggerThread.""" return self.sublogger @@ -240,6 +238,8 @@ def pool_and_threadedlogger(logger, *poolargs): Emulates the multiprocessing.Pool() context manager, but also provides (via a LoggerThread) a SubLogger proxy to logger that can be safely used by pool workers. + The SubLogger proxy interface supports these methods: debug, info, warning, error, + and critical. Also "cleans up" the pool by waiting for workers to complete as it exits the context. """