From 06afcfe1c1a65e69e4ab2933cd8f9af65958a50f Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 8 Aug 2023 12:54:04 -0400 Subject: [PATCH 1/9] logger.py updates --- _delphi_utils_python/delphi_utils/logger.py | 140 +++++++++++++++++--- 1 file changed, 125 insertions(+), 15 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index 3d317ae86..c97f17123 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -1,21 +1,38 @@ -"""Structured logger utility for creating JSON logs in Delphi pipelines.""" +"""Structured logger utility for creating JSON logs.""" + +# the Delphi group uses two ~identical versions of this file. +# try to keep them in sync with edits, for sanity. +# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py +# https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py + import logging +import os +import structlog import sys import threading -import structlog +import traceback def handle_exceptions(logger): """Handle exceptions using the provided logger.""" - def exception_handler(etype, value, traceback): - logger.exception("Top-level exception occurred", - exc_info=(etype, value, traceback)) - def multithread_exception_handler(args): - exception_handler(args.exc_type, args.exc_value, args.exc_traceback) + def exception_handler(scope, etype, value, traceback): + logger.exception("Top-level exception occurred", scope=scope, exc_info=(etype, value, traceback)) - sys.excepthook = exception_handler - threading.excepthook = multithread_exception_handler + def sys_exception_handler(etype, value, traceback): + exception_handler("sys", etype, value, traceback) + + def threading_exception_handler(args): + if args.exc_type == SystemExit and args.exc_value.code == 0: + # `sys.exit(0)` is considered "successful termination": + # https://docs.python.org/3/library/sys.html#sys.exit + logger.debug(f"normal thread exit", thread=args.thread, + stack="".join(traceback.format_exception(args.exc_type, args.exc_value, args.exc_traceback))) + else: + exception_handler(f"thread: {args.thread}", args.exc_type, args.exc_value, args.exc_traceback) + + sys.excepthook = sys_exception_handler + threading.excepthook = threading_exception_handler def get_structured_logger(name=__name__, @@ -40,12 +57,23 @@ def get_structured_logger(name=__name__, is a good choice. filename: An (optional) file to write log output. """ - # Configure the basic underlying logging configuration + # Set the underlying logging configuration + if "LOG_DEBUG" in os.environ: + log_level = logging.DEBUG + else: + log_level = logging.INFO + logging.basicConfig( format="%(message)s", - level=logging.INFO, - handlers=[logging.StreamHandler()] - ) + level=log_level, + handlers=[logging.StreamHandler()]) + + def add_pid(_logger, _method_name, event_dict): + """ + Add current PID to the event dict. + """ + event_dict["pid"] = os.getpid() + return event_dict # Configure structlog. This uses many of the standard suggestions from # the structlog documentation. @@ -57,6 +85,8 @@ def get_structured_logger(name=__name__, structlog.stdlib.add_logger_name, # Include log level in output. structlog.stdlib.add_log_level, + # Include PID in output. + add_pid, # Allow formatting into arguments e.g., logger.info("Hello, %s", # name) structlog.stdlib.PositionalArgumentsFormatter(), @@ -68,7 +98,7 @@ def get_structured_logger(name=__name__, # Decode unicode characters structlog.processors.UnicodeDecoder(), # Render as JSON - structlog.processors.JSONRenderer() + structlog.processors.JSONRenderer(), ], # Use a dict class for keeping track of data. context_class=dict, @@ -84,10 +114,90 @@ def get_structured_logger(name=__name__, system_logger = logging.getLogger(name) if filename and not system_logger.handlers: system_logger.addHandler(logging.FileHandler(filename)) - system_logger.setLevel(logging.INFO) + system_logger.setLevel(log_level) logger = structlog.wrap_logger(system_logger) if log_exceptions: handle_exceptions(logger) return logger + + + + +# the above loggers are thread-safe but not multiprocessing-safe. a `LoggerThread` will spawn a thread that listens to a mp.Queue and logs messages from it with the provided logger, so other processes can send logging messages to it via the logger-like `SubLogger` interface. the SubLogger even logs the pid of the caller. this is good to use with a set of jobs that are part of a mp.Pool, but isnt recommended for general use because of overhead from threading and multiprocessing, and because it might introduce lag to log messages. + +import multiprocessing +import contextlib + +class LoggerThread(): + # TODO: add checks to prevent use of a stopped thread? + # TODO: reduce level of a bunch of debugging logs (search "self.logger.info") + + class SubLogger(): + def __init__(self, queue): + self.queue = queue + def _log(self, level, *args, **kwargs): + kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} + kwargs_plus.update(kwargs) + self.queue.put([level, args, kwargs_plus]) + # TODO: add log levels beyond `info` + def info(self, *args, **kwargs): + self._log(logging.INFO, *args, **kwargs) + + + def get_sublogger(self): + return self.sublogger + + def __init__(self, logger, q=None): + self.logger = logger + if q: + self.msg_queue = q + else: + self.msg_queue = multiprocessing.Queue() + + def logger_thread_worker(): + self.logger.info('thread started') + while True: + msg = self.msg_queue.get() + if msg == 'STOP': + self.logger.info('received stop signal') + break + level, args, kwargs = msg + # TODO: add log levels beyond `info` + if level == logging.INFO: + self.logger.info(*args, **kwargs) + else: + self.logger.error('received unknown logging level! exiting...') + break + self.logger.info('stopping thread') + + self.thread = threading.Thread(target=logger_thread_worker, name="LoggerThread__"+self.logger.name) + self.logger.info('starting thread') + self.thread.start() + + self.sublogger = LoggerThread.SubLogger(self.msg_queue) + + def stop(self): + # TODO: make this safely re-callable? + self.logger.info('sending stop signal') + self.msg_queue.put('STOP') + self.thread.join() + self.logger.info('thread stopped') + + + +@contextlib.contextmanager +def pool_and_threadedlogger(logger, *poolargs): + """ + emulates the multiprocessing.Pool() context manager, but also provides a logger that can be used by pool workers. + """ + with multiprocessing.Manager() as manager: + logger_thread = LoggerThread(logger, manager.Queue()) + try: + with multiprocessing.Pool(*poolargs) as pool: + yield pool, logger_thread.get_sublogger() + pool.close() + pool.join() + finally: + logger_thread.stop() From 48e7734d477aae24cc16d30e7df5ad06c0b3f4c4 Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 8 Aug 2023 14:58:01 -0400 Subject: [PATCH 2/9] attempts to appease linter --- _delphi_utils_python/delphi_utils/logger.py | 155 +++++++++++--------- 1 file changed, 87 insertions(+), 68 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index c97f17123..1504c6fe5 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -10,14 +10,15 @@ import structlog import sys import threading -import traceback +from traceback import format_exception def handle_exceptions(logger): """Handle exceptions using the provided logger.""" def exception_handler(scope, etype, value, traceback): - logger.exception("Top-level exception occurred", scope=scope, exc_info=(etype, value, traceback)) + logger.exception("Top-level exception occurred", + scope=scope, exc_info=(etype, value, traceback)) def sys_exception_handler(etype, value, traceback): exception_handler("sys", etype, value, traceback) @@ -26,10 +27,13 @@ def threading_exception_handler(args): if args.exc_type == SystemExit and args.exc_value.code == 0: # `sys.exit(0)` is considered "successful termination": # https://docs.python.org/3/library/sys.html#sys.exit - logger.debug(f"normal thread exit", thread=args.thread, - stack="".join(traceback.format_exception(args.exc_type, args.exc_value, args.exc_traceback))) + logger.debug("normal thread exit", thread=args.thread, + stack="".join( + format_exception( + args.exc_type, args.exc_value, args.exc_traceback))) else: - exception_handler(f"thread: {args.thread}", args.exc_type, args.exc_value, args.exc_traceback) + exception_handler(f"thread: {args.thread}", + args.exc_type, args.exc_value, args.exc_traceback) sys.excepthook = sys_exception_handler threading.excepthook = threading_exception_handler @@ -125,79 +129,94 @@ def add_pid(_logger, _method_name, event_dict): -# the above loggers are thread-safe but not multiprocessing-safe. a `LoggerThread` will spawn a thread that listens to a mp.Queue and logs messages from it with the provided logger, so other processes can send logging messages to it via the logger-like `SubLogger` interface. the SubLogger even logs the pid of the caller. this is good to use with a set of jobs that are part of a mp.Pool, but isnt recommended for general use because of overhead from threading and multiprocessing, and because it might introduce lag to log messages. -import multiprocessing import contextlib +import multiprocessing class LoggerThread(): - # TODO: add checks to prevent use of a stopped thread? - # TODO: reduce level of a bunch of debugging logs (search "self.logger.info") - - class SubLogger(): - def __init__(self, queue): - self.queue = queue - def _log(self, level, *args, **kwargs): - kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} - kwargs_plus.update(kwargs) - self.queue.put([level, args, kwargs_plus]) - # TODO: add log levels beyond `info` - def info(self, *args, **kwargs): - self._log(logging.INFO, *args, **kwargs) - - - def get_sublogger(self): - return self.sublogger - - def __init__(self, logger, q=None): - self.logger = logger - if q: - self.msg_queue = q - else: - self.msg_queue = multiprocessing.Queue() - - def logger_thread_worker(): - self.logger.info('thread started') - while True: - msg = self.msg_queue.get() - if msg == 'STOP': - self.logger.info('received stop signal') - break - level, args, kwargs = msg + """ + the bare structlog loggers are thread-safe but not multiprocessing-safe. + a `LoggerThread` will spawn a thread that listens to a mp.Queue + and logs messages from it with the provided logger, + so other processes can send logging messages to it + via the logger-like `SubLogger` interface. + the SubLogger even logs the pid of the caller. + + this is good to use with a set of jobs that are part of a mp.Pool, + but isnt recommended for general use + because of overhead from threading and multiprocessing, + and because it might introduce lag to log messages. + """ + + # TODO: add checks to prevent use of a stopped thread? + # TODO: reduce level of a bunch of debugging logs (search "self.logger.info") + + class SubLogger(): + def __init__(self, queue): + self.queue = queue + def _log(self, level, *args, **kwargs): + kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} + kwargs_plus.update(kwargs) + self.queue.put([level, args, kwargs_plus]) # TODO: add log levels beyond `info` - if level == logging.INFO: - self.logger.info(*args, **kwargs) - else: - self.logger.error('received unknown logging level! exiting...') - break - self.logger.info('stopping thread') + def info(self, *args, **kwargs): + self._log(logging.INFO, *args, **kwargs) - self.thread = threading.Thread(target=logger_thread_worker, name="LoggerThread__"+self.logger.name) - self.logger.info('starting thread') - self.thread.start() - self.sublogger = LoggerThread.SubLogger(self.msg_queue) + def get_sublogger(self): + return self.sublogger - def stop(self): - # TODO: make this safely re-callable? - self.logger.info('sending stop signal') - self.msg_queue.put('STOP') - self.thread.join() - self.logger.info('thread stopped') + def __init__(self, logger, q=None): + self.logger = logger + if q: + self.msg_queue = q + else: + self.msg_queue = multiprocessing.Queue() + + def logger_thread_worker(): + self.logger.info('thread started') + while True: + msg = self.msg_queue.get() + if msg == 'STOP': + self.logger.info('received stop signal') + break + level, args, kwargs = msg + # TODO: add log levels beyond `info` + if level == logging.INFO: + self.logger.info(*args, **kwargs) + else: + self.logger.error('received unknown logging level! exiting...') + break + self.logger.info('stopping thread') + + self.thread = threading.Thread(target=logger_thread_worker, + name="LoggerThread__"+self.logger.name) + self.logger.info('starting thread') + self.thread.start() + + self.sublogger = LoggerThread.SubLogger(self.msg_queue) + + def stop(self): + # TODO: make this safely re-callable? + self.logger.info('sending stop signal') + self.msg_queue.put('STOP') + self.thread.join() + self.logger.info('thread stopped') @contextlib.contextmanager def pool_and_threadedlogger(logger, *poolargs): - """ - emulates the multiprocessing.Pool() context manager, but also provides a logger that can be used by pool workers. - """ - with multiprocessing.Manager() as manager: - logger_thread = LoggerThread(logger, manager.Queue()) - try: - with multiprocessing.Pool(*poolargs) as pool: - yield pool, logger_thread.get_sublogger() - pool.close() - pool.join() - finally: - logger_thread.stop() + """ + emulates the multiprocessing.Pool() context manager, + but also provides a logger that can be used by pool workers. + """ + with multiprocessing.Manager() as manager: + logger_thread = LoggerThread(logger, manager.Queue()) + try: + with multiprocessing.Pool(*poolargs) as pool: + yield pool, logger_thread.get_sublogger() + pool.close() + pool.join() + finally: + logger_thread.stop() From 5922b6294487390f1d2bc06828d1acb425a63762 Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 8 Aug 2023 16:08:25 -0400 Subject: [PATCH 3/9] more pylint concessions... --- _delphi_utils_python/delphi_utils/logger.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index 1504c6fe5..caf91f3c5 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -5,7 +5,9 @@ # https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py +import contextlib import logging +import multiprocessing import os import structlog import sys @@ -129,10 +131,6 @@ def add_pid(_logger, _method_name, event_dict): - -import contextlib -import multiprocessing - class LoggerThread(): """ the bare structlog loggers are thread-safe but not multiprocessing-safe. @@ -152,6 +150,10 @@ class LoggerThread(): # TODO: reduce level of a bunch of debugging logs (search "self.logger.info") class SubLogger(): + """ + multiprocessing-safe logger-like interface + to convey log messages to a listening LoggerThread + """ def __init__(self, queue): self.queue = queue def _log(self, level, *args, **kwargs): @@ -160,10 +162,12 @@ def _log(self, level, *args, **kwargs): self.queue.put([level, args, kwargs_plus]) # TODO: add log levels beyond `info` def info(self, *args, **kwargs): + """log an INFO level message""" self._log(logging.INFO, *args, **kwargs) def get_sublogger(self): + """accessor method to retrieve a SubLogger for this LoggerThread""" return self.sublogger def __init__(self, logger, q=None): @@ -197,6 +201,7 @@ def logger_thread_worker(): self.sublogger = LoggerThread.SubLogger(self.msg_queue) def stop(self): + """terminate this LoggerThread""" # TODO: make this safely re-callable? self.logger.info('sending stop signal') self.msg_queue.put('STOP') From aab01ff597ae7ed3588e4da93ed96593721ae326 Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 8 Aug 2023 17:20:04 -0400 Subject: [PATCH 4/9] half-assedly did TODOs, plus some other cleanup --- _delphi_utils_python/delphi_utils/logger.py | 37 ++++++++++++--------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index caf91f3c5..fdbd3ddd7 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -2,18 +2,19 @@ # the Delphi group uses two ~identical versions of this file. # try to keep them in sync with edits, for sanity. -# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py +# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # pylint: disable=line-too-long # https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py import contextlib import logging import multiprocessing import os -import structlog import sys import threading from traceback import format_exception +import structlog + def handle_exceptions(logger): """Handle exceptions using the provided logger.""" @@ -146,9 +147,6 @@ class LoggerThread(): and because it might introduce lag to log messages. """ - # TODO: add checks to prevent use of a stopped thread? - # TODO: reduce level of a bunch of debugging logs (search "self.logger.info") - class SubLogger(): """ multiprocessing-safe logger-like interface @@ -160,10 +158,12 @@ def _log(self, level, *args, **kwargs): kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} kwargs_plus.update(kwargs) self.queue.put([level, args, kwargs_plus]) - # TODO: add log levels beyond `info` def info(self, *args, **kwargs): """log an INFO level message""" self._log(logging.INFO, *args, **kwargs) + def warn(self, *args, **kwargs): + """log an WARN level message""" + self._log(logging.WARN, *args, **kwargs) def get_sublogger(self): @@ -178,34 +178,39 @@ def __init__(self, logger, q=None): self.msg_queue = multiprocessing.Queue() def logger_thread_worker(): - self.logger.info('thread started') + logger.info('thread started') while True: msg = self.msg_queue.get() if msg == 'STOP': - self.logger.info('received stop signal') + logger.debug('received stop signal') break level, args, kwargs = msg - # TODO: add log levels beyond `info` if level == logging.INFO: - self.logger.info(*args, **kwargs) + logger.info(*args, **kwargs) + if level == logging.WARN: + logger.warn(*args, **kwargs) else: - self.logger.error('received unknown logging level! exiting...') + logger.error('received unknown logging level! exiting...') break - self.logger.info('stopping thread') + logger.debug('stopping thread') self.thread = threading.Thread(target=logger_thread_worker, - name="LoggerThread__"+self.logger.name) - self.logger.info('starting thread') + name="LoggerThread__"+logger.name) + logger.debug('starting thread') self.thread.start() self.sublogger = LoggerThread.SubLogger(self.msg_queue) + self.running = True def stop(self): """terminate this LoggerThread""" - # TODO: make this safely re-callable? - self.logger.info('sending stop signal') + if not self.running: + self.logger.warn('thread already stopped') + return + self.logger.debug('sending stop signal') self.msg_queue.put('STOP') self.thread.join() + self.running = False self.logger.info('thread stopped') From 432c47a6d1342e67bd38d11fc8f8301f4c12db15 Mon Sep 17 00:00:00 2001 From: george haff Date: Tue, 8 Aug 2023 18:14:34 -0400 Subject: [PATCH 5/9] pydocstyle is on my bad list --- _delphi_utils_python/delphi_utils/logger.py | 22 +++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index fdbd3ddd7..349f2c883 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -76,9 +76,7 @@ def get_structured_logger(name=__name__, handlers=[logging.StreamHandler()]) def add_pid(_logger, _method_name, event_dict): - """ - Add current PID to the event dict. - """ + """Add current PID to the event dict.""" event_dict["pid"] = os.getpid() return event_dict @@ -149,25 +147,29 @@ class LoggerThread(): class SubLogger(): """ - multiprocessing-safe logger-like interface - to convey log messages to a listening LoggerThread + Multiprocessing-safe logger-like interface + to convey log messages to a listening LoggerThread. """ + def __init__(self, queue): self.queue = queue + def _log(self, level, *args, **kwargs): kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} kwargs_plus.update(kwargs) self.queue.put([level, args, kwargs_plus]) + def info(self, *args, **kwargs): - """log an INFO level message""" + """Log an INFO level message.""" self._log(logging.INFO, *args, **kwargs) + def warn(self, *args, **kwargs): - """log an WARN level message""" + """Log a WARN level message.""" self._log(logging.WARN, *args, **kwargs) def get_sublogger(self): - """accessor method to retrieve a SubLogger for this LoggerThread""" + """Accessor method to retrieve a SubLogger for this LoggerThread.""" return self.sublogger def __init__(self, logger, q=None): @@ -203,7 +205,7 @@ def logger_thread_worker(): self.running = True def stop(self): - """terminate this LoggerThread""" + """Terminate this LoggerThread.""" if not self.running: self.logger.warn('thread already stopped') return @@ -218,7 +220,7 @@ def stop(self): @contextlib.contextmanager def pool_and_threadedlogger(logger, *poolargs): """ - emulates the multiprocessing.Pool() context manager, + Emulates the multiprocessing.Pool() context manager, but also provides a logger that can be used by pool workers. """ with multiprocessing.Manager() as manager: From 19f863bd89c02236578697145f26a8cbd7c937e5 Mon Sep 17 00:00:00 2001 From: george haff Date: Fri, 11 Aug 2023 12:22:30 -0400 Subject: [PATCH 6/9] docstring 'fixes', additional log level methods --- _delphi_utils_python/delphi_utils/logger.py | 49 ++++++++++++++------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index 349f2c883..aae95cd79 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -128,10 +128,10 @@ def add_pid(_logger, _method_name, event_dict): return logger - - class LoggerThread(): """ + A construct to use a logger from multiprocessing workers/jobs. + the bare structlog loggers are thread-safe but not multiprocessing-safe. a `LoggerThread` will spawn a thread that listens to a mp.Queue and logs messages from it with the provided logger, @@ -143,15 +143,18 @@ class LoggerThread(): but isnt recommended for general use because of overhead from threading and multiprocessing, and because it might introduce lag to log messages. + + somewhat inspired by: + docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes """ class SubLogger(): """ - Multiprocessing-safe logger-like interface - to convey log messages to a listening LoggerThread. + MP-safe logger-like interface to convey log messages to a listening LoggerThread. """ def __init__(self, queue): + """Save a handle to the queue""" self.queue = queue def _log(self, level, *args, **kwargs): @@ -159,13 +162,27 @@ def _log(self, level, *args, **kwargs): kwargs_plus.update(kwargs) self.queue.put([level, args, kwargs_plus]) + def debug(self, *args, **kwargs): + """Log a DEBUG level message.""" + self._log(logging.DEBUG, *args, **kwargs) + def info(self, *args, **kwargs): """Log an INFO level message.""" self._log(logging.INFO, *args, **kwargs) - def warn(self, *args, **kwargs): - """Log a WARN level message.""" - self._log(logging.WARN, *args, **kwargs) + def warning(self, *args, **kwargs): + """Log a WARNING level message.""" + self._log(logging.WARNING, *args, **kwargs) + + def error(self, *args, **kwargs): + """Log an ERROR level message.""" + self._log(logging.ERROR, *args, **kwargs) + + def critical(self, *args, **kwargs): + """Log a CRITICAL level message.""" + self._log(logging.CRITICAL, *args, **kwargs) + + def get_sublogger(self): @@ -173,6 +190,7 @@ def get_sublogger(self): return self.sublogger def __init__(self, logger, q=None): + """Save handles to logger and queue, creating queue if not specified.""" self.logger = logger if q: self.msg_queue = q @@ -187,12 +205,12 @@ def logger_thread_worker(): logger.debug('received stop signal') break level, args, kwargs = msg - if level == logging.INFO: - logger.info(*args, **kwargs) - if level == logging.WARN: - logger.warn(*args, **kwargs) + if level in [logging.DEBUG, logging.INFO, logging.WARNING, + logging.ERROR, logging.CRITICAL]: + logger.log(level, *args, **kwargs) else: - logger.error('received unknown logging level! exiting...') + logger.error('received unknown logging level! exiting...', + level=level, args_kwargs=(args, kwargs)) break logger.debug('stopping thread') @@ -207,7 +225,7 @@ def logger_thread_worker(): def stop(self): """Terminate this LoggerThread.""" if not self.running: - self.logger.warn('thread already stopped') + self.logger.warning('thread already stopped') return self.logger.debug('sending stop signal') self.msg_queue.put('STOP') @@ -216,12 +234,13 @@ def stop(self): self.logger.info('thread stopped') - @contextlib.contextmanager def pool_and_threadedlogger(logger, *poolargs): """ + Makes a proxy to logger available to a mp.Pool, which it also cleans up. + Emulates the multiprocessing.Pool() context manager, - but also provides a logger that can be used by pool workers. + but also provides a wrapper to logger that can be used by pool workers. """ with multiprocessing.Manager() as manager: logger_thread = LoggerThread(logger, manager.Queue()) From f97058f60b75b2a2b77b11fe88d4256f12aa04d4 Mon Sep 17 00:00:00 2001 From: george haff Date: Fri, 11 Aug 2023 14:22:54 -0400 Subject: [PATCH 7/9] more pydoc cleanup --- _delphi_utils_python/delphi_utils/logger.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index aae95cd79..889b95502 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -149,12 +149,10 @@ class LoggerThread(): """ class SubLogger(): - """ - MP-safe logger-like interface to convey log messages to a listening LoggerThread. - """ + """MP-safe logger-like interface to convey log messages to a listening LoggerThread.""" def __init__(self, queue): - """Save a handle to the queue""" + """Create SubLogger with a bound queue.""" self.queue = queue def _log(self, level, *args, **kwargs): @@ -186,11 +184,11 @@ def critical(self, *args, **kwargs): def get_sublogger(self): - """Accessor method to retrieve a SubLogger for this LoggerThread.""" + """Retrieve SubLogger for this LoggerThread.""" return self.sublogger def __init__(self, logger, q=None): - """Save handles to logger and queue, creating queue if not specified.""" + """Create and start LoggerThread with supplied logger, creating a queue if not provided.""" self.logger = logger if q: self.msg_queue = q @@ -237,10 +235,13 @@ def stop(self): @contextlib.contextmanager def pool_and_threadedlogger(logger, *poolargs): """ - Makes a proxy to logger available to a mp.Pool, which it also cleans up. + Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger. Emulates the multiprocessing.Pool() context manager, - but also provides a wrapper to logger that can be used by pool workers. + but also provides (via a LoggerThread) a SubLogger proxy to logger + that can be safely used by pool workers. + Also "cleans up" the pool by waiting for workers to complete + as it exits the context. """ with multiprocessing.Manager() as manager: logger_thread = LoggerThread(logger, manager.Queue()) From 6be101fea12d4ec2a0dfed229b0625c895edcc24 Mon Sep 17 00:00:00 2001 From: george haff Date: Fri, 11 Aug 2023 15:20:20 -0400 Subject: [PATCH 8/9] removed errant newlines --- _delphi_utils_python/delphi_utils/logger.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index 889b95502..f90382303 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -181,8 +181,6 @@ def critical(self, *args, **kwargs): self._log(logging.CRITICAL, *args, **kwargs) - - def get_sublogger(self): """Retrieve SubLogger for this LoggerThread.""" return self.sublogger From 01171357a0b51c3de0b6208535b3b78cac2c94dd Mon Sep 17 00:00:00 2001 From: melange396 Date: Mon, 14 Aug 2023 09:31:10 -0400 Subject: [PATCH 9/9] Update _delphi_utils_python/delphi_utils/logger.py Co-authored-by: Dmitry Shemetov --- _delphi_utils_python/delphi_utils/logger.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index f90382303..d04ff7673 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -238,6 +238,8 @@ def pool_and_threadedlogger(logger, *poolargs): Emulates the multiprocessing.Pool() context manager, but also provides (via a LoggerThread) a SubLogger proxy to logger that can be safely used by pool workers. + The SubLogger proxy interface supports these methods: debug, info, warning, error, + and critical. Also "cleans up" the pool by waiting for workers to complete as it exits the context. """