From 76e05207f3c6f1139f672b87ed1391385f4b86d7 Mon Sep 17 00:00:00 2001 From: Ian McKenzie Date: Wed, 9 Jul 2025 15:02:22 -0700 Subject: [PATCH 1/5] stdout logging to stdout.log --- .../stdout_logging/test_stdout_integration.py | 23 +++++ mlflow/tracking/fluent.py | 27 ++++++ mlflow/utils/stdout_logging.py | 97 +++++++++++++++++++ 3 files changed, 147 insertions(+) create mode 100644 examples/stdout_logging/test_stdout_integration.py create mode 100644 mlflow/utils/stdout_logging.py diff --git a/examples/stdout_logging/test_stdout_integration.py b/examples/stdout_logging/test_stdout_integration.py new file mode 100644 index 0000000000000..2c4badbfe21d3 --- /dev/null +++ b/examples/stdout_logging/test_stdout_integration.py @@ -0,0 +1,23 @@ +import time + +import mlflow + +mlflow.set_tracking_uri("http://127.0.0.1:5002") + +if __name__ == "__main__": + mlflow.set_experiment("stdout_test") + + print("Testing stdout logging integration...") + + with mlflow.start_run(log_stdout=True, log_stdout_interval=3) as run: + print(f"MLflow Run ID: {run.info.run_id}") + print("This should appear in both terminal and MLflow!") + + N_LOGS = 30 + for i in range(N_LOGS): + print(f"Message {i + 1}/{N_LOGS}") + time.sleep(1) + + print("Test completed!") + + print("This message should only appear in terminal (run has ended)") diff --git a/mlflow/tracking/fluent.py b/mlflow/tracking/fluent.py index f2f1f24418031..b9bf5eab2f5a8 100644 --- a/mlflow/tracking/fluent.py +++ b/mlflow/tracking/fluent.py @@ -105,6 +105,7 @@ run_id_to_system_metrics_monitor = {} +run_id_to_stdout_logger = {} _active_run_stack = ThreadLocalVariable(default_factory=lambda: []) @@ -267,6 +268,8 @@ def start_run( tags: Optional[dict[str, Any]] = None, description: Optional[str] = None, log_system_metrics: Optional[bool] = None, + log_stdout: Optional[bool] = None, + log_stdout_interval: int = 5, ) -> ActiveRun: """ Start a new MLflow run, setting it as the active run under which metrics and parameters @@ -309,6 +312,11 @@ def start_run( to MLflow, e.g., cpu/gpu utilization. If None, we will check environment variable `MLFLOW_ENABLE_SYSTEM_METRICS_LOGGING` to determine whether to log system metrics. System metrics logging is an experimental feature in MLflow 2.8 and subject to change. + log_stdout: bool, defaults to None. If True, stdout will be captured and periodically + logged to MLflow as an artifact named 'stdout.log'. If False, stdout logging is + disabled. If None, stdout logging is disabled by default. + log_stdout_interval: int, defaults to 5. The interval in seconds at which to log + the captured stdout to MLflow. Only used when log_stdout is True. Returns: :py:class:`mlflow.ActiveRun` object that acts as a context manager wrapping the @@ -502,6 +510,19 @@ def start_run( _logger.error(f"Failed to start system metrics monitoring: {e}.") active_run_stack.append(ActiveRun(active_run_obj)) + + if log_stdout: + try: + from mlflow.utils.stdout_logging import log_stdout_stream + + # Create a context manager that will be entered when the ActiveRun is used + stdout_logger = log_stdout_stream(interval_seconds=log_stdout_interval) + run_id_to_stdout_logger[active_run_obj.info.run_id] = stdout_logger + # Start the stdout logging + stdout_logger.__enter__() + except Exception as e: + _logger.error(f"Failed to start stdout logging: {e}.") + return active_run_stack[-1] @@ -548,6 +569,12 @@ def end_run(status: str = RunStatus.to_string(RunStatus.FINISHED)) -> None: if last_active_run_id in run_id_to_system_metrics_monitor: system_metrics_monitor = run_id_to_system_metrics_monitor.pop(last_active_run_id) system_metrics_monitor.finish() + if last_active_run_id in run_id_to_stdout_logger: + stdout_logger = run_id_to_stdout_logger.pop(last_active_run_id) + try: + stdout_logger.__exit__(None, None, None) + except Exception as e: + _logger.error(f"Failed to stop stdout logging: {e}.") def _safe_end_run(): diff --git a/mlflow/utils/stdout_logging.py b/mlflow/utils/stdout_logging.py new file mode 100644 index 0000000000000..b9d82be81dd00 --- /dev/null +++ b/mlflow/utils/stdout_logging.py @@ -0,0 +1,97 @@ +import sys +import threading +import time +from contextlib import contextmanager +from io import StringIO + +import mlflow + + +class TeeStringIO: + """A file-like object that writes to both original stdout and a StringIO buffer.""" + + def __init__(self, original_stdout, string_buffer): + self.original_stdout = original_stdout + self.string_buffer = string_buffer + + def write(self, data): + # Write to both original stdout and our buffer + self.original_stdout.write(data) + self.string_buffer.write(data) + return len(data) + + def flush(self): + self.original_stdout.flush() + self.string_buffer.flush() + + def __getattr__(self, name): + # Delegate other attributes to original stdout + return getattr(self.original_stdout, name) + + +@contextmanager +def log_stdout_stream(interval_seconds=5): + """ + A context manager to stream stdout to an MLflow artifact. + + This context manager redirects `sys.stdout` to an in-memory buffer. + A background thread periodically flushes this buffer and logs its + contents to an MLflow artifact file named 'stdout.log'. + + Args: + interval_seconds (int): The interval in seconds at which to log + the stdout buffer to MLflow. + + Example: + import time + import mlflow + + with mlflow.start_run(): + with log_stdout_stream(): + print("This is the start of my script.") + time.sleep(6) + print("This message will appear in the first log upload.") + time.sleep(6) + print("And this will be in the second.") + # The context manager will automatically handle final log upload + # and cleanup. + print("Stdout is now back to normal.") + """ + if not mlflow.active_run(): + raise RuntimeError("An active MLflow run is required to stream stdout.") + + original_stdout = sys.stdout + stdout_buffer = StringIO() + tee_stdout = TeeStringIO(original_stdout, stdout_buffer) + sys.stdout = tee_stdout + + stop_event = threading.Event() + log_thread = None + + def _log_loop(): + while not stop_event.is_set(): + time.sleep(interval_seconds) + _log_current_stdout() + + def _log_current_stdout(): + content = stdout_buffer.getvalue() + + if content: + mlflow.log_text(content, "stdout.log") + + try: + log_thread = threading.Thread(target=_log_loop, name="mlflow-stdout-logging") + log_thread.daemon = True + log_thread.start() + yield + finally: + if log_thread: + stop_event.set() + log_thread.join() + + # Final flush and log to capture any remaining output + _log_current_stdout() + + # Restore stdout + sys.stdout = original_stdout + stdout_buffer.close() From e3ec000db363da0e15aa2ee4dd889f5da328fd06 Mon Sep 17 00:00:00 2001 From: Ian McKenzie Date: Wed, 9 Jul 2025 15:53:31 -0700 Subject: [PATCH 2/5] fix duplicated runs --- mlflow/tracking/fluent.py | 5 ++++- mlflow/utils/stdout_logging.py | 12 +++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/mlflow/tracking/fluent.py b/mlflow/tracking/fluent.py index b9bf5eab2f5a8..125fe378d0248 100644 --- a/mlflow/tracking/fluent.py +++ b/mlflow/tracking/fluent.py @@ -516,7 +516,10 @@ def start_run( from mlflow.utils.stdout_logging import log_stdout_stream # Create a context manager that will be entered when the ActiveRun is used - stdout_logger = log_stdout_stream(interval_seconds=log_stdout_interval) + stdout_logger = log_stdout_stream( + active_run_obj.info.run_id, + interval_seconds=log_stdout_interval, + ) run_id_to_stdout_logger[active_run_obj.info.run_id] = stdout_logger # Start the stdout logging stdout_logger.__enter__() diff --git a/mlflow/utils/stdout_logging.py b/mlflow/utils/stdout_logging.py index b9d82be81dd00..f06958ef5b16e 100644 --- a/mlflow/utils/stdout_logging.py +++ b/mlflow/utils/stdout_logging.py @@ -30,7 +30,7 @@ def __getattr__(self, name): @contextmanager -def log_stdout_stream(interval_seconds=5): +def log_stdout_stream(run_id, interval_seconds=5): """ A context manager to stream stdout to an MLflow artifact. @@ -39,6 +39,7 @@ def log_stdout_stream(interval_seconds=5): contents to an MLflow artifact file named 'stdout.log'. Args: + run_id (str): The run ID to log stdout to. interval_seconds (int): The interval in seconds at which to log the stdout buffer to MLflow. @@ -46,8 +47,8 @@ def log_stdout_stream(interval_seconds=5): import time import mlflow - with mlflow.start_run(): - with log_stdout_stream(): + with mlflow.start_run() as run: + with log_stdout_stream(run.info.run_id): print("This is the start of my script.") time.sleep(6) print("This message will appear in the first log upload.") @@ -57,9 +58,6 @@ def log_stdout_stream(interval_seconds=5): # and cleanup. print("Stdout is now back to normal.") """ - if not mlflow.active_run(): - raise RuntimeError("An active MLflow run is required to stream stdout.") - original_stdout = sys.stdout stdout_buffer = StringIO() tee_stdout = TeeStringIO(original_stdout, stdout_buffer) @@ -77,7 +75,7 @@ def _log_current_stdout(): content = stdout_buffer.getvalue() if content: - mlflow.log_text(content, "stdout.log") + mlflow.log_text(content, "stdout.log", run_id=run_id) try: log_thread = threading.Thread(target=_log_loop, name="mlflow-stdout-logging") From 47c1386a386a99c6023c50180ba468c9be6335a4 Mon Sep 17 00:00:00 2001 From: Ian McKenzie Date: Wed, 9 Jul 2025 17:33:36 -0700 Subject: [PATCH 3/5] use file descripter rather than python stdout --- .../stdout_logging/test_stdout_integration.py | 91 +++++++++++++++- mlflow/utils/stdout_logging.py | 102 ++++++++++++------ 2 files changed, 158 insertions(+), 35 deletions(-) diff --git a/examples/stdout_logging/test_stdout_integration.py b/examples/stdout_logging/test_stdout_integration.py index 2c4badbfe21d3..d97ecb391e8db 100644 --- a/examples/stdout_logging/test_stdout_integration.py +++ b/examples/stdout_logging/test_stdout_integration.py @@ -1,3 +1,26 @@ +#!/usr/bin/env python3 +""" +MLflow Stdout Logging Integration Test + +This test demonstrates the enhanced stdout logging that captures output from: +1. Python print statements (sys.stdout) +2. C/C++ libraries via subprocess calls +3. System commands via os.system() +4. C library functions via ctypes + +The test requires MLflow server to be running. Start it with: + mlflow server --host 127.0.0.1 --port 5002 + +Expected behavior: +- All output appears in both terminal and MLflow artifact 'stdout.log' +- Both Python and C/C++ stdout are captured +- Final message appears only in terminal (after run ends) +""" + +import ctypes +import os +import subprocess +import sys import time import mlflow @@ -13,9 +36,73 @@ print(f"MLflow Run ID: {run.info.run_id}") print("This should appear in both terminal and MLflow!") - N_LOGS = 30 + # Test Python stdout + print("=== Testing Python stdout ===") + for i in range(3): + print(f"Python message {i + 1}/3") + time.sleep(1) + + # Test C/C++ stdout via subprocess + print("\n=== Testing C/C++ stdout via subprocess ===") + try: + subprocess.run(["echo", "Hello from echo command!"], check=True) + subprocess.run(["printf", "Formatted output from printf: %d\\n", "42"], check=True) + if sys.platform.startswith("linux") or sys.platform.startswith("darwin"): + subprocess.run(["ls", "-la", "/tmp"], check=True, stdout=None) + except subprocess.CalledProcessError as e: + print(f"Subprocess failed: {e}") + except FileNotFoundError: + print("Some commands not available on this system") + + # Test C/C++ stdout via os.system + print("\n=== Testing C/C++ stdout via os.system ===") + os.system("echo 'Hello from os.system!'") + if sys.platform.startswith("win"): + os.system("dir") + else: + os.system("date") + + # Test C library stdout via ctypes + print("\n=== Testing C library stdout via ctypes ===") + try: + # Get libc using the portable approach + if sys.platform.startswith("win"): + libc = ctypes.CDLL("msvcrt") + libc.printf(b"Hello from C printf via ctypes!\n") + try: + libc.fflush(None) # Flush all open streams + except Exception: + pass # fflush may not be available + else: + # Use ctypes.util.find_library to find libc + import ctypes.util + + libc_path = ctypes.util.find_library("c") + if libc_path: + libc = ctypes.CDLL(libc_path) + libc.printf(b"Hello from C printf via ctypes!\n") + libc.puts(b"Hello from C puts via ctypes!") + try: + libc.fflush(None) # Flush all open streams + except Exception: + pass # fflush may not be available + else: + print("Could not find libc library") + except (OSError, ImportError) as e: + print(f"Could not load C library: {e}") + + # Force flush of stdout to ensure all output is captured + sys.stdout.flush() + try: + os.fsync(1) # Force flush of file descriptor 1 + except OSError: + pass # fsync may not be available on all platforms + + # Test more Python stdout + print("\n=== Back to Python stdout ===") + N_LOGS = 5 for i in range(N_LOGS): - print(f"Message {i + 1}/{N_LOGS}") + print(f"Final Python message {i + 1}/{N_LOGS}") time.sleep(1) print("Test completed!") diff --git a/mlflow/utils/stdout_logging.py b/mlflow/utils/stdout_logging.py index f06958ef5b16e..eb051ee016ea1 100644 --- a/mlflow/utils/stdout_logging.py +++ b/mlflow/utils/stdout_logging.py @@ -1,4 +1,5 @@ -import sys +import os +import select import threading import time from contextlib import contextmanager @@ -7,36 +8,15 @@ import mlflow -class TeeStringIO: - """A file-like object that writes to both original stdout and a StringIO buffer.""" - - def __init__(self, original_stdout, string_buffer): - self.original_stdout = original_stdout - self.string_buffer = string_buffer - - def write(self, data): - # Write to both original stdout and our buffer - self.original_stdout.write(data) - self.string_buffer.write(data) - return len(data) - - def flush(self): - self.original_stdout.flush() - self.string_buffer.flush() - - def __getattr__(self, name): - # Delegate other attributes to original stdout - return getattr(self.original_stdout, name) - - @contextmanager def log_stdout_stream(run_id, interval_seconds=5): """ A context manager to stream stdout to an MLflow artifact. - This context manager redirects `sys.stdout` to an in-memory buffer. - A background thread periodically flushes this buffer and logs its - contents to an MLflow artifact file named 'stdout.log'. + This context manager redirects file descriptor 1 (stdout) to capture output + from both Python code and underlying C/C++ libraries. A background thread + periodically flushes this buffer and logs its contents to an MLflow artifact + file named 'stdout.log'. Args: run_id (str): The run ID to log stdout to. @@ -46,25 +26,53 @@ def log_stdout_stream(run_id, interval_seconds=5): Example: import time import mlflow + import subprocess with mlflow.start_run() as run: with log_stdout_stream(run.info.run_id): print("This is the start of my script.") time.sleep(6) print("This message will appear in the first log upload.") + subprocess.run(["echo", "This C/C++ output will also be captured"]) time.sleep(6) print("And this will be in the second.") # The context manager will automatically handle final log upload # and cleanup. print("Stdout is now back to normal.") """ - original_stdout = sys.stdout stdout_buffer = StringIO() - tee_stdout = TeeStringIO(original_stdout, stdout_buffer) - sys.stdout = tee_stdout + + # Save original file descriptor 1 (stdout) + original_stdout_fd = os.dup(1) + + # Create a pipe for capturing file descriptor 1 output + pipe_read, pipe_write = os.pipe() + + # Redirect file descriptor 1 to the write end of the pipe + os.dup2(pipe_write, 1) + os.close(pipe_write) stop_event = threading.Event() log_thread = None + pipe_thread = None + + def _pipe_reader(): + """Read from the pipe and write to both original stdout and buffer.""" + while not stop_event.is_set(): + # Use select to check if there's data available to read + ready, _, _ = select.select([pipe_read], [], [], 0.1) + if ready: + try: + data = os.read(pipe_read, 4096) + if data: + # Decode bytes to string + text = data.decode("utf-8", errors="replace") + # Write to original stdout + os.write(original_stdout_fd, data) + # Write to buffer + stdout_buffer.write(text) + except OSError: + break def _log_loop(): while not stop_event.is_set(): @@ -78,18 +86,46 @@ def _log_current_stdout(): mlflow.log_text(content, "stdout.log", run_id=run_id) try: + # Start the pipe reader thread + pipe_thread = threading.Thread(target=_pipe_reader, name="mlflow-pipe-reader") + pipe_thread.daemon = True + pipe_thread.start() + + # Start the logging thread log_thread = threading.Thread(target=_log_loop, name="mlflow-stdout-logging") log_thread.daemon = True log_thread.start() + yield finally: + # Signal threads to stop + stop_event.set() + + # Flush any remaining buffered output before restoring file descriptor + try: + # Force flush of file descriptor 1 to ensure all C library output is captured + os.fsync(1) + except OSError: + pass # fsync may not be available on all platforms + + # Small delay to ensure all buffered output is processed + time.sleep(0.1) + + # Restore file descriptor 1 + os.dup2(original_stdout_fd, 1) + os.close(original_stdout_fd) + + # Wait for threads to finish + if pipe_thread: + pipe_thread.join(timeout=1.0) if log_thread: - stop_event.set() - log_thread.join() + log_thread.join(timeout=1.0) + + # Close the pipe + os.close(pipe_read) # Final flush and log to capture any remaining output _log_current_stdout() - # Restore stdout - sys.stdout = original_stdout + # Close the buffer stdout_buffer.close() From a3dd8ee486b1a14f9989e17cd45919e85b499096 Mon Sep 17 00:00:00 2001 From: Ian McKenzie Date: Wed, 9 Jul 2025 17:37:35 -0700 Subject: [PATCH 4/5] add stderr --- .../stdout_logging/test_stdout_integration.py | 25 +++- mlflow/utils/stdout_logging.py | 109 +++++++++++++----- 2 files changed, 102 insertions(+), 32 deletions(-) diff --git a/examples/stdout_logging/test_stdout_integration.py b/examples/stdout_logging/test_stdout_integration.py index d97ecb391e8db..19d81ac0412cf 100644 --- a/examples/stdout_logging/test_stdout_integration.py +++ b/examples/stdout_logging/test_stdout_integration.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 """ -MLflow Stdout Logging Integration Test +MLflow Stdout/Stderr Logging Integration Test This test demonstrates the enhanced stdout logging that captures output from: -1. Python print statements (sys.stdout) +1. Python print statements (sys.stdout and sys.stderr) 2. C/C++ libraries via subprocess calls 3. System commands via os.system() 4. C library functions via ctypes @@ -13,7 +13,7 @@ Expected behavior: - All output appears in both terminal and MLflow artifact 'stdout.log' -- Both Python and C/C++ stdout are captured +- Both Python and C/C++ stdout AND stderr are captured - Final message appears only in terminal (after run ends) """ @@ -42,6 +42,12 @@ print(f"Python message {i + 1}/3") time.sleep(1) + # Test Python stderr + print("\n=== Testing Python stderr ===") + for i in range(2): + print(f"Python stderr message {i + 1}/2", file=sys.stderr) + time.sleep(1) + # Test C/C++ stdout via subprocess print("\n=== Testing C/C++ stdout via subprocess ===") try: @@ -54,6 +60,19 @@ except FileNotFoundError: print("Some commands not available on this system") + # Test C/C++ stderr via subprocess + print("\n=== Testing C/C++ stderr via subprocess ===") + try: + # This should write to stderr + subprocess.run(["sh", "-c", "echo 'Error message to stderr' >&2"], check=True) + if sys.platform.startswith("linux") or sys.platform.startswith("darwin"): + # ls with invalid path writes to stderr + subprocess.run(["ls", "/nonexistent/path"], check=False) + except subprocess.CalledProcessError as e: + print(f"Subprocess failed: {e}") + except FileNotFoundError: + print("Some commands not available on this system") + # Test C/C++ stdout via os.system print("\n=== Testing C/C++ stdout via os.system ===") os.system("echo 'Hello from os.system!'") diff --git a/mlflow/utils/stdout_logging.py b/mlflow/utils/stdout_logging.py index eb051ee016ea1..63f6d88a9a318 100644 --- a/mlflow/utils/stdout_logging.py +++ b/mlflow/utils/stdout_logging.py @@ -9,19 +9,21 @@ @contextmanager -def log_stdout_stream(run_id, interval_seconds=5): +def log_stdout_stream(run_id, interval_seconds=5, capture_stderr=True): """ - A context manager to stream stdout to an MLflow artifact. + A context manager to stream stdout (and optionally stderr) to an MLflow artifact. - This context manager redirects file descriptor 1 (stdout) to capture output - from both Python code and underlying C/C++ libraries. A background thread - periodically flushes this buffer and logs its contents to an MLflow artifact - file named 'stdout.log'. + This context manager redirects file descriptor 1 (stdout) and optionally + file descriptor 2 (stderr) to capture output from both Python code and + underlying C/C++ libraries. A background thread periodically flushes this + buffer and logs its contents to an MLflow artifact file named 'stdout.log'. Args: run_id (str): The run ID to log stdout to. interval_seconds (int): The interval in seconds at which to log the stdout buffer to MLflow. + capture_stderr (bool): Whether to also capture stderr output. + Defaults to True. Example: import time @@ -29,8 +31,9 @@ def log_stdout_stream(run_id, interval_seconds=5): import subprocess with mlflow.start_run() as run: - with log_stdout_stream(run.info.run_id): + with log_stdout_stream(run.info.run_id, capture_stderr=True): print("This is the start of my script.") + print("This error goes to stderr!", file=sys.stderr) time.sleep(6) print("This message will appear in the first log upload.") subprocess.run(["echo", "This C/C++ output will also be captured"]) @@ -42,28 +45,35 @@ def log_stdout_stream(run_id, interval_seconds=5): """ stdout_buffer = StringIO() - # Save original file descriptor 1 (stdout) + # Save original file descriptors original_stdout_fd = os.dup(1) + original_stderr_fd = os.dup(2) if capture_stderr else None - # Create a pipe for capturing file descriptor 1 output - pipe_read, pipe_write = os.pipe() + # Create pipes for capturing file descriptors + stdout_pipe_read, stdout_pipe_write = os.pipe() + stderr_pipe_read, stderr_pipe_write = os.pipe() if capture_stderr else (None, None) - # Redirect file descriptor 1 to the write end of the pipe - os.dup2(pipe_write, 1) - os.close(pipe_write) + # Redirect file descriptors to pipes + os.dup2(stdout_pipe_write, 1) + os.close(stdout_pipe_write) + + if capture_stderr and stderr_pipe_write is not None: + os.dup2(stderr_pipe_write, 2) + os.close(stderr_pipe_write) stop_event = threading.Event() log_thread = None - pipe_thread = None + stdout_pipe_thread = None + stderr_pipe_thread = None - def _pipe_reader(): - """Read from the pipe and write to both original stdout and buffer.""" + def _stdout_pipe_reader(): + """Read from the stdout pipe and write to both original stdout and buffer.""" while not stop_event.is_set(): # Use select to check if there's data available to read - ready, _, _ = select.select([pipe_read], [], [], 0.1) + ready, _, _ = select.select([stdout_pipe_read], [], [], 0.1) if ready: try: - data = os.read(pipe_read, 4096) + data = os.read(stdout_pipe_read, 4096) if data: # Decode bytes to string text = data.decode("utf-8", errors="replace") @@ -74,6 +84,27 @@ def _pipe_reader(): except OSError: break + def _stderr_pipe_reader(): + """Read from the stderr pipe and write to both original stderr and buffer.""" + if not capture_stderr or stderr_pipe_read is None or original_stderr_fd is None: + return + + while not stop_event.is_set(): + # Use select to check if there's data available to read + ready, _, _ = select.select([stderr_pipe_read], [], [], 0.1) + if ready: + try: + data = os.read(stderr_pipe_read, 4096) + if data: + # Decode bytes to string + text = data.decode("utf-8", errors="replace") + # Write to original stderr + os.write(original_stderr_fd, data) + # Write to buffer (combined with stdout) + stdout_buffer.write(text) + except OSError: + break + def _log_loop(): while not stop_event.is_set(): time.sleep(interval_seconds) @@ -86,10 +117,20 @@ def _log_current_stdout(): mlflow.log_text(content, "stdout.log", run_id=run_id) try: - # Start the pipe reader thread - pipe_thread = threading.Thread(target=_pipe_reader, name="mlflow-pipe-reader") - pipe_thread.daemon = True - pipe_thread.start() + # Start the stdout pipe reader thread + stdout_pipe_thread = threading.Thread( + target=_stdout_pipe_reader, name="mlflow-stdout-pipe-reader" + ) + stdout_pipe_thread.daemon = True + stdout_pipe_thread.start() + + # Start the stderr pipe reader thread if capturing stderr + if capture_stderr: + stderr_pipe_thread = threading.Thread( + target=_stderr_pipe_reader, name="mlflow-stderr-pipe-reader" + ) + stderr_pipe_thread.daemon = True + stderr_pipe_thread.start() # Start the logging thread log_thread = threading.Thread(target=_log_loop, name="mlflow-stdout-logging") @@ -101,28 +142,38 @@ def _log_current_stdout(): # Signal threads to stop stop_event.set() - # Flush any remaining buffered output before restoring file descriptor + # Flush any remaining buffered output before restoring file descriptors try: - # Force flush of file descriptor 1 to ensure all C library output is captured + # Force flush of file descriptors to ensure all output is captured os.fsync(1) + if capture_stderr and original_stderr_fd is not None: + os.fsync(2) except OSError: pass # fsync may not be available on all platforms # Small delay to ensure all buffered output is processed time.sleep(0.1) - # Restore file descriptor 1 + # Restore file descriptors os.dup2(original_stdout_fd, 1) os.close(original_stdout_fd) + if capture_stderr and original_stderr_fd is not None: + os.dup2(original_stderr_fd, 2) + os.close(original_stderr_fd) + # Wait for threads to finish - if pipe_thread: - pipe_thread.join(timeout=1.0) + if stdout_pipe_thread: + stdout_pipe_thread.join(timeout=1.0) + if stderr_pipe_thread: + stderr_pipe_thread.join(timeout=1.0) if log_thread: log_thread.join(timeout=1.0) - # Close the pipe - os.close(pipe_read) + # Close the pipes + os.close(stdout_pipe_read) + if capture_stderr and stderr_pipe_read is not None: + os.close(stderr_pipe_read) # Final flush and log to capture any remaining output _log_current_stdout() From 4dc26380197d40e77a38a5bc7708bd070b67e89d Mon Sep 17 00:00:00 2001 From: Ian McKenzie Date: Wed, 9 Jul 2025 17:39:53 -0700 Subject: [PATCH 5/5] refactor to reduce duplication --- mlflow/utils/stdout_logging.py | 173 ++++++++++++++++----------------- 1 file changed, 85 insertions(+), 88 deletions(-) diff --git a/mlflow/utils/stdout_logging.py b/mlflow/utils/stdout_logging.py index 63f6d88a9a318..6f6648997501d 100644 --- a/mlflow/utils/stdout_logging.py +++ b/mlflow/utils/stdout_logging.py @@ -4,10 +4,76 @@ import time from contextlib import contextmanager from io import StringIO +from typing import Optional import mlflow +class _PipeManager: + """Manages a pipe for capturing file descriptor output.""" + + def __init__(self, fd_num: int, original_fd: int, buffer: StringIO, name: str): + self.fd_num = fd_num + self.original_fd = original_fd + self.buffer = buffer + self.name = name + self.pipe_read: Optional[int] = None + self.pipe_write: Optional[int] = None + self.thread: Optional[threading.Thread] = None + + def setup_pipe(self): + """Create and setup the pipe for this file descriptor.""" + self.pipe_read, self.pipe_write = os.pipe() + os.dup2(self.pipe_write, self.fd_num) + os.close(self.pipe_write) + self.pipe_write = None + + def start_reader_thread(self, stop_event: threading.Event): + """Start the pipe reader thread.""" + if self.pipe_read is None: + return + + def _pipe_reader(): + """Read from the pipe and write to both original fd and buffer.""" + pipe_read = self.pipe_read # Capture for type safety + if pipe_read is None: + return + + while not stop_event.is_set(): + # Use select to check if there's data available to read + ready, _, _ = select.select([pipe_read], [], [], 0.1) + if ready: + try: + data = os.read(pipe_read, 4096) + if data: + # Decode bytes to string + text = data.decode("utf-8", errors="replace") + # Write to original file descriptor + os.write(self.original_fd, data) + # Write to buffer + self.buffer.write(text) + except OSError: + break + + self.thread = threading.Thread(target=_pipe_reader, name=f"mlflow-{self.name}-pipe-reader") + self.thread.daemon = True + self.thread.start() + + def cleanup(self): + """Clean up the pipe and restore the original file descriptor.""" + # Restore original file descriptor + os.dup2(self.original_fd, self.fd_num) + os.close(self.original_fd) + + # Wait for thread to finish + if self.thread: + self.thread.join(timeout=1.0) + + # Close the pipe + if self.pipe_read is not None: + os.close(self.pipe_read) + + @contextmanager def log_stdout_stream(run_id, interval_seconds=5, capture_stderr=True): """ @@ -45,65 +111,19 @@ def log_stdout_stream(run_id, interval_seconds=5, capture_stderr=True): """ stdout_buffer = StringIO() - # Save original file descriptors - original_stdout_fd = os.dup(1) - original_stderr_fd = os.dup(2) if capture_stderr else None + # Create pipe managers + stdout_manager = _PipeManager(1, os.dup(1), stdout_buffer, "stdout") + stderr_manager = None + if capture_stderr: + stderr_manager = _PipeManager(2, os.dup(2), stdout_buffer, "stderr") - # Create pipes for capturing file descriptors - stdout_pipe_read, stdout_pipe_write = os.pipe() - stderr_pipe_read, stderr_pipe_write = os.pipe() if capture_stderr else (None, None) - - # Redirect file descriptors to pipes - os.dup2(stdout_pipe_write, 1) - os.close(stdout_pipe_write) - - if capture_stderr and stderr_pipe_write is not None: - os.dup2(stderr_pipe_write, 2) - os.close(stderr_pipe_write) + # Setup pipes + stdout_manager.setup_pipe() + if stderr_manager: + stderr_manager.setup_pipe() stop_event = threading.Event() log_thread = None - stdout_pipe_thread = None - stderr_pipe_thread = None - - def _stdout_pipe_reader(): - """Read from the stdout pipe and write to both original stdout and buffer.""" - while not stop_event.is_set(): - # Use select to check if there's data available to read - ready, _, _ = select.select([stdout_pipe_read], [], [], 0.1) - if ready: - try: - data = os.read(stdout_pipe_read, 4096) - if data: - # Decode bytes to string - text = data.decode("utf-8", errors="replace") - # Write to original stdout - os.write(original_stdout_fd, data) - # Write to buffer - stdout_buffer.write(text) - except OSError: - break - - def _stderr_pipe_reader(): - """Read from the stderr pipe and write to both original stderr and buffer.""" - if not capture_stderr or stderr_pipe_read is None or original_stderr_fd is None: - return - - while not stop_event.is_set(): - # Use select to check if there's data available to read - ready, _, _ = select.select([stderr_pipe_read], [], [], 0.1) - if ready: - try: - data = os.read(stderr_pipe_read, 4096) - if data: - # Decode bytes to string - text = data.decode("utf-8", errors="replace") - # Write to original stderr - os.write(original_stderr_fd, data) - # Write to buffer (combined with stdout) - stdout_buffer.write(text) - except OSError: - break def _log_loop(): while not stop_event.is_set(): @@ -112,25 +132,14 @@ def _log_loop(): def _log_current_stdout(): content = stdout_buffer.getvalue() - if content: mlflow.log_text(content, "stdout.log", run_id=run_id) try: - # Start the stdout pipe reader thread - stdout_pipe_thread = threading.Thread( - target=_stdout_pipe_reader, name="mlflow-stdout-pipe-reader" - ) - stdout_pipe_thread.daemon = True - stdout_pipe_thread.start() - - # Start the stderr pipe reader thread if capturing stderr - if capture_stderr: - stderr_pipe_thread = threading.Thread( - target=_stderr_pipe_reader, name="mlflow-stderr-pipe-reader" - ) - stderr_pipe_thread.daemon = True - stderr_pipe_thread.start() + # Start pipe reader threads + stdout_manager.start_reader_thread(stop_event) + if stderr_manager: + stderr_manager.start_reader_thread(stop_event) # Start the logging thread log_thread = threading.Thread(target=_log_loop, name="mlflow-stdout-logging") @@ -146,7 +155,7 @@ def _log_current_stdout(): try: # Force flush of file descriptors to ensure all output is captured os.fsync(1) - if capture_stderr and original_stderr_fd is not None: + if capture_stderr: os.fsync(2) except OSError: pass # fsync may not be available on all platforms @@ -154,27 +163,15 @@ def _log_current_stdout(): # Small delay to ensure all buffered output is processed time.sleep(0.1) - # Restore file descriptors - os.dup2(original_stdout_fd, 1) - os.close(original_stdout_fd) - - if capture_stderr and original_stderr_fd is not None: - os.dup2(original_stderr_fd, 2) - os.close(original_stderr_fd) + # Clean up pipe managers + stdout_manager.cleanup() + if stderr_manager: + stderr_manager.cleanup() - # Wait for threads to finish - if stdout_pipe_thread: - stdout_pipe_thread.join(timeout=1.0) - if stderr_pipe_thread: - stderr_pipe_thread.join(timeout=1.0) + # Wait for logging thread to finish if log_thread: log_thread.join(timeout=1.0) - # Close the pipes - os.close(stdout_pipe_read) - if capture_stderr and stderr_pipe_read is not None: - os.close(stderr_pipe_read) - # Final flush and log to capture any remaining output _log_current_stdout()