Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions examples/stdout_logging/test_stdout_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""
MLflow Stdout/Stderr Logging Integration Test

This test demonstrates the enhanced stdout logging that captures output from:
1. Python print statements (sys.stdout and sys.stderr)
2. C/C++ libraries via subprocess calls
3. System commands via os.system()
4. C library functions via ctypes

The test requires MLflow server to be running. Start it with:
mlflow server --host 127.0.0.1 --port 5002

Expected behavior:
- All output appears in both terminal and MLflow artifact 'stdout.log'
- Both Python and C/C++ stdout AND stderr are captured
- Final message appears only in terminal (after run ends)
"""

import ctypes
import os
import subprocess
import sys
import time

import mlflow

mlflow.set_tracking_uri("http://127.0.0.1:5002")

if __name__ == "__main__":
mlflow.set_experiment("stdout_test")

print("Testing stdout logging integration...")

with mlflow.start_run(log_stdout=True, log_stdout_interval=3) as run:
print(f"MLflow Run ID: {run.info.run_id}")
print("This should appear in both terminal and MLflow!")

# Test Python stdout
print("=== Testing Python stdout ===")
for i in range(3):
print(f"Python message {i + 1}/3")
time.sleep(1)

# Test Python stderr
print("\n=== Testing Python stderr ===")
for i in range(2):
print(f"Python stderr message {i + 1}/2", file=sys.stderr)
time.sleep(1)

# Test C/C++ stdout via subprocess
print("\n=== Testing C/C++ stdout via subprocess ===")
try:
subprocess.run(["echo", "Hello from echo command!"], check=True)
subprocess.run(["printf", "Formatted output from printf: %d\\n", "42"], check=True)
if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
subprocess.run(["ls", "-la", "/tmp"], check=True, stdout=None)
except subprocess.CalledProcessError as e:
print(f"Subprocess failed: {e}")
except FileNotFoundError:
print("Some commands not available on this system")

# Test C/C++ stderr via subprocess
print("\n=== Testing C/C++ stderr via subprocess ===")
try:
# This should write to stderr
subprocess.run(["sh", "-c", "echo 'Error message to stderr' >&2"], check=True)
if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
# ls with invalid path writes to stderr
subprocess.run(["ls", "/nonexistent/path"], check=False)
except subprocess.CalledProcessError as e:
print(f"Subprocess failed: {e}")
except FileNotFoundError:
print("Some commands not available on this system")

# Test C/C++ stdout via os.system
print("\n=== Testing C/C++ stdout via os.system ===")
os.system("echo 'Hello from os.system!'")
if sys.platform.startswith("win"):
os.system("dir")
else:
os.system("date")

# Test C library stdout via ctypes
print("\n=== Testing C library stdout via ctypes ===")
try:
# Get libc using the portable approach
if sys.platform.startswith("win"):
libc = ctypes.CDLL("msvcrt")
libc.printf(b"Hello from C printf via ctypes!\n")
try:
libc.fflush(None) # Flush all open streams
except Exception:
pass # fflush may not be available
else:
# Use ctypes.util.find_library to find libc
import ctypes.util

libc_path = ctypes.util.find_library("c")
if libc_path:
libc = ctypes.CDLL(libc_path)
libc.printf(b"Hello from C printf via ctypes!\n")
libc.puts(b"Hello from C puts via ctypes!")
try:
libc.fflush(None) # Flush all open streams
except Exception:
pass # fflush may not be available
else:
print("Could not find libc library")
except (OSError, ImportError) as e:
print(f"Could not load C library: {e}")

# Force flush of stdout to ensure all output is captured
sys.stdout.flush()
try:
os.fsync(1) # Force flush of file descriptor 1
except OSError:
pass # fsync may not be available on all platforms

# Test more Python stdout
print("\n=== Back to Python stdout ===")
N_LOGS = 5
for i in range(N_LOGS):
print(f"Final Python message {i + 1}/{N_LOGS}")
time.sleep(1)

print("Test completed!")

print("This message should only appear in terminal (run has ended)")
30 changes: 30 additions & 0 deletions mlflow/tracking/fluent.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@


run_id_to_system_metrics_monitor = {}
run_id_to_stdout_logger = {}


_active_run_stack = ThreadLocalVariable(default_factory=lambda: [])
Expand Down Expand Up @@ -267,6 +268,8 @@ def start_run(
tags: Optional[dict[str, Any]] = None,
description: Optional[str] = None,
log_system_metrics: Optional[bool] = None,
log_stdout: Optional[bool] = None,
log_stdout_interval: int = 5,
) -> ActiveRun:
"""
Start a new MLflow run, setting it as the active run under which metrics and parameters
Expand Down Expand Up @@ -309,6 +312,11 @@ def start_run(
to MLflow, e.g., cpu/gpu utilization. If None, we will check environment variable
`MLFLOW_ENABLE_SYSTEM_METRICS_LOGGING` to determine whether to log system metrics.
System metrics logging is an experimental feature in MLflow 2.8 and subject to change.
log_stdout: bool, defaults to None. If True, stdout will be captured and periodically
logged to MLflow as an artifact named 'stdout.log'. If False, stdout logging is
disabled. If None, stdout logging is disabled by default.
log_stdout_interval: int, defaults to 5. The interval in seconds at which to log
the captured stdout to MLflow. Only used when log_stdout is True.

Returns:
:py:class:`mlflow.ActiveRun` object that acts as a context manager wrapping the
Expand Down Expand Up @@ -502,6 +510,22 @@ def start_run(
_logger.error(f"Failed to start system metrics monitoring: {e}.")

active_run_stack.append(ActiveRun(active_run_obj))

if log_stdout:
try:
from mlflow.utils.stdout_logging import log_stdout_stream

# Create a context manager that will be entered when the ActiveRun is used
stdout_logger = log_stdout_stream(
active_run_obj.info.run_id,
interval_seconds=log_stdout_interval,
)
run_id_to_stdout_logger[active_run_obj.info.run_id] = stdout_logger
# Start the stdout logging
stdout_logger.__enter__()
except Exception as e:
_logger.error(f"Failed to start stdout logging: {e}.")

return active_run_stack[-1]


Expand Down Expand Up @@ -548,6 +572,12 @@ def end_run(status: str = RunStatus.to_string(RunStatus.FINISHED)) -> None:
if last_active_run_id in run_id_to_system_metrics_monitor:
system_metrics_monitor = run_id_to_system_metrics_monitor.pop(last_active_run_id)
system_metrics_monitor.finish()
if last_active_run_id in run_id_to_stdout_logger:
stdout_logger = run_id_to_stdout_logger.pop(last_active_run_id)
try:
stdout_logger.__exit__(None, None, None)
except Exception as e:
_logger.error(f"Failed to stop stdout logging: {e}.")


def _safe_end_run():
Expand Down
179 changes: 179 additions & 0 deletions mlflow/utils/stdout_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import os
import select
import threading
import time
from contextlib import contextmanager
from io import StringIO
from typing import Optional

import mlflow


class _PipeManager:
"""Manages a pipe for capturing file descriptor output."""

def __init__(self, fd_num: int, original_fd: int, buffer: StringIO, name: str):
self.fd_num = fd_num
self.original_fd = original_fd
self.buffer = buffer
self.name = name
self.pipe_read: Optional[int] = None
self.pipe_write: Optional[int] = None
self.thread: Optional[threading.Thread] = None

def setup_pipe(self):
"""Create and setup the pipe for this file descriptor."""
self.pipe_read, self.pipe_write = os.pipe()
os.dup2(self.pipe_write, self.fd_num)
os.close(self.pipe_write)
self.pipe_write = None

def start_reader_thread(self, stop_event: threading.Event):
"""Start the pipe reader thread."""
if self.pipe_read is None:
return

def _pipe_reader():
"""Read from the pipe and write to both original fd and buffer."""
pipe_read = self.pipe_read # Capture for type safety
if pipe_read is None:
return

while not stop_event.is_set():
# Use select to check if there's data available to read
ready, _, _ = select.select([pipe_read], [], [], 0.1)
if ready:
try:
data = os.read(pipe_read, 4096)
if data:
# Decode bytes to string
text = data.decode("utf-8", errors="replace")
# Write to original file descriptor
os.write(self.original_fd, data)
# Write to buffer
self.buffer.write(text)
except OSError:
break

self.thread = threading.Thread(target=_pipe_reader, name=f"mlflow-{self.name}-pipe-reader")
self.thread.daemon = True
self.thread.start()

def cleanup(self):
"""Clean up the pipe and restore the original file descriptor."""
# Restore original file descriptor
os.dup2(self.original_fd, self.fd_num)
os.close(self.original_fd)

# Wait for thread to finish
if self.thread:
self.thread.join(timeout=1.0)

# Close the pipe
if self.pipe_read is not None:
os.close(self.pipe_read)


@contextmanager
def log_stdout_stream(run_id, interval_seconds=5, capture_stderr=True):
"""
A context manager to stream stdout (and optionally stderr) to an MLflow artifact.

This context manager redirects file descriptor 1 (stdout) and optionally
file descriptor 2 (stderr) to capture output from both Python code and
underlying C/C++ libraries. A background thread periodically flushes this
buffer and logs its contents to an MLflow artifact file named 'stdout.log'.

Args:
run_id (str): The run ID to log stdout to.
interval_seconds (int): The interval in seconds at which to log
the stdout buffer to MLflow.
capture_stderr (bool): Whether to also capture stderr output.
Defaults to True.

Example:
import time
import mlflow
import subprocess

with mlflow.start_run() as run:
with log_stdout_stream(run.info.run_id, capture_stderr=True):
print("This is the start of my script.")
print("This error goes to stderr!", file=sys.stderr)
time.sleep(6)
print("This message will appear in the first log upload.")
subprocess.run(["echo", "This C/C++ output will also be captured"])
time.sleep(6)
print("And this will be in the second.")
# The context manager will automatically handle final log upload
# and cleanup.
print("Stdout is now back to normal.")
"""
stdout_buffer = StringIO()

# Create pipe managers
stdout_manager = _PipeManager(1, os.dup(1), stdout_buffer, "stdout")
stderr_manager = None
if capture_stderr:
stderr_manager = _PipeManager(2, os.dup(2), stdout_buffer, "stderr")

# Setup pipes
stdout_manager.setup_pipe()
if stderr_manager:
stderr_manager.setup_pipe()

stop_event = threading.Event()
log_thread = None

def _log_loop():
while not stop_event.is_set():
time.sleep(interval_seconds)
_log_current_stdout()

def _log_current_stdout():
content = stdout_buffer.getvalue()
if content:
mlflow.log_text(content, "stdout.log", run_id=run_id)

try:
# Start pipe reader threads
stdout_manager.start_reader_thread(stop_event)
if stderr_manager:
stderr_manager.start_reader_thread(stop_event)

# Start the logging thread
log_thread = threading.Thread(target=_log_loop, name="mlflow-stdout-logging")
log_thread.daemon = True
log_thread.start()

yield
finally:
# Signal threads to stop
stop_event.set()

# Flush any remaining buffered output before restoring file descriptors
try:
# Force flush of file descriptors to ensure all output is captured
os.fsync(1)
if capture_stderr:
os.fsync(2)
except OSError:
pass # fsync may not be available on all platforms

# Small delay to ensure all buffered output is processed
time.sleep(0.1)

# Clean up pipe managers
stdout_manager.cleanup()
if stderr_manager:
stderr_manager.cleanup()

# Wait for logging thread to finish
if log_thread:
log_thread.join(timeout=1.0)

# Final flush and log to capture any remaining output
_log_current_stdout()

# Close the buffer
stdout_buffer.close()