diff --git a/examples/stdout_logging/test_stdout_integration.py b/examples/stdout_logging/test_stdout_integration.py new file mode 100644 index 0000000000000..19d81ac0412cf --- /dev/null +++ b/examples/stdout_logging/test_stdout_integration.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +""" +MLflow Stdout/Stderr Logging Integration Test + +This test demonstrates the enhanced stdout logging that captures output from: +1. Python print statements (sys.stdout and sys.stderr) +2. C/C++ libraries via subprocess calls +3. System commands via os.system() +4. C library functions via ctypes + +The test requires MLflow server to be running. Start it with: + mlflow server --host 127.0.0.1 --port 5002 + +Expected behavior: +- All output appears in both terminal and MLflow artifact 'stdout.log' +- Both Python and C/C++ stdout AND stderr are captured +- Final message appears only in terminal (after run ends) +""" + +import ctypes +import os +import subprocess +import sys +import time + +import mlflow + +mlflow.set_tracking_uri("http://127.0.0.1:5002") + +if __name__ == "__main__": + mlflow.set_experiment("stdout_test") + + print("Testing stdout logging integration...") + + with mlflow.start_run(log_stdout=True, log_stdout_interval=3) as run: + print(f"MLflow Run ID: {run.info.run_id}") + print("This should appear in both terminal and MLflow!") + + # Test Python stdout + print("=== Testing Python stdout ===") + for i in range(3): + print(f"Python message {i + 1}/3") + time.sleep(1) + + # Test Python stderr + print("\n=== Testing Python stderr ===") + for i in range(2): + print(f"Python stderr message {i + 1}/2", file=sys.stderr) + time.sleep(1) + + # Test C/C++ stdout via subprocess + print("\n=== Testing C/C++ stdout via subprocess ===") + try: + subprocess.run(["echo", "Hello from echo command!"], check=True) + subprocess.run(["printf", "Formatted output from printf: %d\\n", "42"], check=True) + if sys.platform.startswith("linux") or sys.platform.startswith("darwin"): + subprocess.run(["ls", "-la", "/tmp"], check=True, stdout=None) + except subprocess.CalledProcessError as e: + print(f"Subprocess failed: {e}") + except FileNotFoundError: + print("Some commands not available on this system") + + # Test C/C++ stderr via subprocess + print("\n=== Testing C/C++ stderr via subprocess ===") + try: + # This should write to stderr + subprocess.run(["sh", "-c", "echo 'Error message to stderr' >&2"], check=True) + if sys.platform.startswith("linux") or sys.platform.startswith("darwin"): + # ls with invalid path writes to stderr + subprocess.run(["ls", "/nonexistent/path"], check=False) + except subprocess.CalledProcessError as e: + print(f"Subprocess failed: {e}") + except FileNotFoundError: + print("Some commands not available on this system") + + # Test C/C++ stdout via os.system + print("\n=== Testing C/C++ stdout via os.system ===") + os.system("echo 'Hello from os.system!'") + if sys.platform.startswith("win"): + os.system("dir") + else: + os.system("date") + + # Test C library stdout via ctypes + print("\n=== Testing C library stdout via ctypes ===") + try: + # Get libc using the portable approach + if sys.platform.startswith("win"): + libc = ctypes.CDLL("msvcrt") + libc.printf(b"Hello from C printf via ctypes!\n") + try: + libc.fflush(None) # Flush all open streams + except Exception: + pass # fflush may not be available + else: + # Use ctypes.util.find_library to find libc + import ctypes.util + + libc_path = ctypes.util.find_library("c") + if libc_path: + libc = ctypes.CDLL(libc_path) + libc.printf(b"Hello from C printf via ctypes!\n") + libc.puts(b"Hello from C puts via ctypes!") + try: + libc.fflush(None) # Flush all open streams + except Exception: + pass # fflush may not be available + else: + print("Could not find libc library") + except (OSError, ImportError) as e: + print(f"Could not load C library: {e}") + + # Force flush of stdout to ensure all output is captured + sys.stdout.flush() + try: + os.fsync(1) # Force flush of file descriptor 1 + except OSError: + pass # fsync may not be available on all platforms + + # Test more Python stdout + print("\n=== Back to Python stdout ===") + N_LOGS = 5 + for i in range(N_LOGS): + print(f"Final Python message {i + 1}/{N_LOGS}") + time.sleep(1) + + print("Test completed!") + + print("This message should only appear in terminal (run has ended)") diff --git a/mlflow/tracking/fluent.py b/mlflow/tracking/fluent.py index f2f1f24418031..125fe378d0248 100644 --- a/mlflow/tracking/fluent.py +++ b/mlflow/tracking/fluent.py @@ -105,6 +105,7 @@ run_id_to_system_metrics_monitor = {} +run_id_to_stdout_logger = {} _active_run_stack = ThreadLocalVariable(default_factory=lambda: []) @@ -267,6 +268,8 @@ def start_run( tags: Optional[dict[str, Any]] = None, description: Optional[str] = None, log_system_metrics: Optional[bool] = None, + log_stdout: Optional[bool] = None, + log_stdout_interval: int = 5, ) -> ActiveRun: """ Start a new MLflow run, setting it as the active run under which metrics and parameters @@ -309,6 +312,11 @@ def start_run( to MLflow, e.g., cpu/gpu utilization. If None, we will check environment variable `MLFLOW_ENABLE_SYSTEM_METRICS_LOGGING` to determine whether to log system metrics. System metrics logging is an experimental feature in MLflow 2.8 and subject to change. + log_stdout: bool, defaults to None. If True, stdout will be captured and periodically + logged to MLflow as an artifact named 'stdout.log'. If False, stdout logging is + disabled. If None, stdout logging is disabled by default. + log_stdout_interval: int, defaults to 5. The interval in seconds at which to log + the captured stdout to MLflow. Only used when log_stdout is True. Returns: :py:class:`mlflow.ActiveRun` object that acts as a context manager wrapping the @@ -502,6 +510,22 @@ def start_run( _logger.error(f"Failed to start system metrics monitoring: {e}.") active_run_stack.append(ActiveRun(active_run_obj)) + + if log_stdout: + try: + from mlflow.utils.stdout_logging import log_stdout_stream + + # Create a context manager that will be entered when the ActiveRun is used + stdout_logger = log_stdout_stream( + active_run_obj.info.run_id, + interval_seconds=log_stdout_interval, + ) + run_id_to_stdout_logger[active_run_obj.info.run_id] = stdout_logger + # Start the stdout logging + stdout_logger.__enter__() + except Exception as e: + _logger.error(f"Failed to start stdout logging: {e}.") + return active_run_stack[-1] @@ -548,6 +572,12 @@ def end_run(status: str = RunStatus.to_string(RunStatus.FINISHED)) -> None: if last_active_run_id in run_id_to_system_metrics_monitor: system_metrics_monitor = run_id_to_system_metrics_monitor.pop(last_active_run_id) system_metrics_monitor.finish() + if last_active_run_id in run_id_to_stdout_logger: + stdout_logger = run_id_to_stdout_logger.pop(last_active_run_id) + try: + stdout_logger.__exit__(None, None, None) + except Exception as e: + _logger.error(f"Failed to stop stdout logging: {e}.") def _safe_end_run(): diff --git a/mlflow/utils/stdout_logging.py b/mlflow/utils/stdout_logging.py new file mode 100644 index 0000000000000..6f6648997501d --- /dev/null +++ b/mlflow/utils/stdout_logging.py @@ -0,0 +1,179 @@ +import os +import select +import threading +import time +from contextlib import contextmanager +from io import StringIO +from typing import Optional + +import mlflow + + +class _PipeManager: + """Manages a pipe for capturing file descriptor output.""" + + def __init__(self, fd_num: int, original_fd: int, buffer: StringIO, name: str): + self.fd_num = fd_num + self.original_fd = original_fd + self.buffer = buffer + self.name = name + self.pipe_read: Optional[int] = None + self.pipe_write: Optional[int] = None + self.thread: Optional[threading.Thread] = None + + def setup_pipe(self): + """Create and setup the pipe for this file descriptor.""" + self.pipe_read, self.pipe_write = os.pipe() + os.dup2(self.pipe_write, self.fd_num) + os.close(self.pipe_write) + self.pipe_write = None + + def start_reader_thread(self, stop_event: threading.Event): + """Start the pipe reader thread.""" + if self.pipe_read is None: + return + + def _pipe_reader(): + """Read from the pipe and write to both original fd and buffer.""" + pipe_read = self.pipe_read # Capture for type safety + if pipe_read is None: + return + + while not stop_event.is_set(): + # Use select to check if there's data available to read + ready, _, _ = select.select([pipe_read], [], [], 0.1) + if ready: + try: + data = os.read(pipe_read, 4096) + if data: + # Decode bytes to string + text = data.decode("utf-8", errors="replace") + # Write to original file descriptor + os.write(self.original_fd, data) + # Write to buffer + self.buffer.write(text) + except OSError: + break + + self.thread = threading.Thread(target=_pipe_reader, name=f"mlflow-{self.name}-pipe-reader") + self.thread.daemon = True + self.thread.start() + + def cleanup(self): + """Clean up the pipe and restore the original file descriptor.""" + # Restore original file descriptor + os.dup2(self.original_fd, self.fd_num) + os.close(self.original_fd) + + # Wait for thread to finish + if self.thread: + self.thread.join(timeout=1.0) + + # Close the pipe + if self.pipe_read is not None: + os.close(self.pipe_read) + + +@contextmanager +def log_stdout_stream(run_id, interval_seconds=5, capture_stderr=True): + """ + A context manager to stream stdout (and optionally stderr) to an MLflow artifact. + + This context manager redirects file descriptor 1 (stdout) and optionally + file descriptor 2 (stderr) to capture output from both Python code and + underlying C/C++ libraries. A background thread periodically flushes this + buffer and logs its contents to an MLflow artifact file named 'stdout.log'. + + Args: + run_id (str): The run ID to log stdout to. + interval_seconds (int): The interval in seconds at which to log + the stdout buffer to MLflow. + capture_stderr (bool): Whether to also capture stderr output. + Defaults to True. + + Example: + import time + import mlflow + import subprocess + + with mlflow.start_run() as run: + with log_stdout_stream(run.info.run_id, capture_stderr=True): + print("This is the start of my script.") + print("This error goes to stderr!", file=sys.stderr) + time.sleep(6) + print("This message will appear in the first log upload.") + subprocess.run(["echo", "This C/C++ output will also be captured"]) + time.sleep(6) + print("And this will be in the second.") + # The context manager will automatically handle final log upload + # and cleanup. + print("Stdout is now back to normal.") + """ + stdout_buffer = StringIO() + + # Create pipe managers + stdout_manager = _PipeManager(1, os.dup(1), stdout_buffer, "stdout") + stderr_manager = None + if capture_stderr: + stderr_manager = _PipeManager(2, os.dup(2), stdout_buffer, "stderr") + + # Setup pipes + stdout_manager.setup_pipe() + if stderr_manager: + stderr_manager.setup_pipe() + + stop_event = threading.Event() + log_thread = None + + def _log_loop(): + while not stop_event.is_set(): + time.sleep(interval_seconds) + _log_current_stdout() + + def _log_current_stdout(): + content = stdout_buffer.getvalue() + if content: + mlflow.log_text(content, "stdout.log", run_id=run_id) + + try: + # Start pipe reader threads + stdout_manager.start_reader_thread(stop_event) + if stderr_manager: + stderr_manager.start_reader_thread(stop_event) + + # Start the logging thread + log_thread = threading.Thread(target=_log_loop, name="mlflow-stdout-logging") + log_thread.daemon = True + log_thread.start() + + yield + finally: + # Signal threads to stop + stop_event.set() + + # Flush any remaining buffered output before restoring file descriptors + try: + # Force flush of file descriptors to ensure all output is captured + os.fsync(1) + if capture_stderr: + os.fsync(2) + except OSError: + pass # fsync may not be available on all platforms + + # Small delay to ensure all buffered output is processed + time.sleep(0.1) + + # Clean up pipe managers + stdout_manager.cleanup() + if stderr_manager: + stderr_manager.cleanup() + + # Wait for logging thread to finish + if log_thread: + log_thread.join(timeout=1.0) + + # Final flush and log to capture any remaining output + _log_current_stdout() + + # Close the buffer + stdout_buffer.close()