Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions examples/realtime/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import asyncio
import os
import sys
from typing import TYPE_CHECKING

import numpy as np

# Add the current directory to path so we can import ui
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from agents import function_tool
from agents.realtime import RealtimeAgent, RealtimeSession, RealtimeSessionEvent

if TYPE_CHECKING:
from .ui import AppUI
else:
# Try both import styles
try:
# Try relative import first (when used as a package)
from .ui import AppUI
except ImportError:
# Fall back to direct import (when run as a script)
from ui import AppUI


@function_tool
def get_weather(city: str) -> str:
"""Get the weather in a city."""
return f"The weather in {city} is sunny."


agent = RealtimeAgent(
name="Assistant",
instructions="You always greet the user with 'Top of the morning to you'.",
tools=[get_weather],
)


class Example:
def __init__(self) -> None:
self.session = RealtimeSession(agent)
self.ui = AppUI()
self.ui.connected = asyncio.Event()
self.ui.last_audio_item_id = None
# Set the audio callback
self.ui.set_audio_callback(self.on_audio_recorded)

async def run(self) -> None:
self.session.add_listener(self.on_event)
await self.session.connect()
self.ui.set_is_connected(True)
await self.ui.run_async()

async def on_audio_recorded(self, audio_bytes: bytes) -> None:
"""Called when audio is recorded by the UI."""
try:
# Send the audio to the session
await self.session.send_audio(audio_bytes)
except Exception as e:
self.ui.log_message(f"Error sending audio: {e}")

async def on_event(self, event: RealtimeSessionEvent) -> None:
# Display event in the UI
try:
if event.type == "agent_start":
self.ui.add_transcript(f"Agent started: {event.agent.name}")
elif event.type == "agent_end":
self.ui.add_transcript(f"Agent ended: {event.agent.name}")
elif event.type == "handoff":
self.ui.add_transcript(
f"Handoff from {event.from_agent.name} to {event.to_agent.name}"
)
elif event.type == "tool_start":
self.ui.add_transcript(f"Tool started: {event.tool.name}")
elif event.type == "tool_end":
self.ui.add_transcript(f"Tool ended: {event.tool.name}; output: {event.output}")
elif event.type == "audio_end":
self.ui.add_transcript("Audio ended")
elif event.type == "audio":
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
self.ui.play_audio(np_audio)
elif event.type == "audio_interrupted":
self.ui.add_transcript("Audio interrupted")
elif event.type == "error":
self.ui.add_transcript(f"Error: {event.error}")
elif event.type == "history_updated":
pass
elif event.type == "history_added":
pass
elif event.type == "raw_transport_event":
self.ui.log_message(f"Raw transport event: {event.data}")
else:
self.ui.log_message(f"Unknown event type: {event.type}")
except Exception as e:
# This can happen if the UI has already exited
self.ui.log_message(f"Event handling error: {str(e)}")


if __name__ == "__main__":
example = Example()
asyncio.run(example.run())
271 changes: 271 additions & 0 deletions examples/realtime/ui.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
from __future__ import annotations

import asyncio
from collections.abc import Coroutine
from typing import Any, Callable

import numpy as np
import numpy.typing as npt
import sounddevice as sd
from textual import events
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal
from textual.reactive import reactive
from textual.widgets import RichLog, Static
from typing_extensions import override

CHUNK_LENGTH_S = 0.05 # 50ms
SAMPLE_RATE = 24000
FORMAT = np.int16
CHANNELS = 1


class Header(Static):
"""A header widget."""

@override
def render(self) -> str:
return "Realtime Demo"


class AudioStatusIndicator(Static):
"""A widget that shows the current audio recording status."""

is_recording = reactive(False)

@override
def render(self) -> str:
status = (
"🔴 Conversation started."
if self.is_recording
else "⚪ Press SPACE to start the conversation (q to quit)"
)
return status


class AppUI(App[None]):
CSS = """
Screen {
background: #1a1b26; /* Dark blue-grey background */
}

Container {
border: double rgb(91, 164, 91);
}

#input-container {
height: 5; /* Explicit height for input container */
margin: 1 1;
padding: 1 2;
}

#bottom-pane {
width: 100%;
height: 82%; /* Reduced to make room for session display */
border: round rgb(205, 133, 63);
}

#status-indicator {
height: 3;
content-align: center middle;
background: #2a2b36;
border: solid rgb(91, 164, 91);
margin: 1 1;
}

#session-display {
height: 3;
content-align: center middle;
background: #2a2b36;
border: solid rgb(91, 164, 91);
margin: 1 1;
}

#transcripts {
width: 50%;
height: 100%;
border-right: solid rgb(91, 164, 91);
}

#transcripts-header {
height: 2;
background: #2a2b36;
content-align: center middle;
border-bottom: solid rgb(91, 164, 91);
}

#transcripts-content {
height: 100%;
}

#event-log {
width: 50%;
height: 100%;
}

#event-log-header {
height: 2;
background: #2a2b36;
content-align: center middle;
border-bottom: solid rgb(91, 164, 91);
}

#event-log-content {
height: 100%;
}

Static {
color: white;
}
"""

should_send_audio: asyncio.Event
connected: asyncio.Event
last_audio_item_id: str | None
audio_callback: Callable[[bytes], Coroutine[Any, Any, None]] | None

def __init__(self) -> None:
super().__init__()
self.audio_player = sd.OutputStream(
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype=FORMAT,
)
self.should_send_audio = asyncio.Event()
self.connected = asyncio.Event()
self.audio_callback = None

@override
def compose(self) -> ComposeResult:
"""Create child widgets for the app."""
with Container():
yield Header(id="session-display")
yield AudioStatusIndicator(id="status-indicator")
with Container(id="bottom-pane"):
with Horizontal():
with Container(id="transcripts"):
yield Static("Conversation transcript", id="transcripts-header")
yield RichLog(
id="transcripts-content", wrap=True, highlight=True, markup=True
)
with Container(id="event-log"):
yield Static("Raw event log", id="event-log-header")
yield RichLog(
id="event-log-content", wrap=True, highlight=True, markup=True
)

def set_is_connected(self, is_connected: bool) -> None:
self.connected.set() if is_connected else self.connected.clear()

def set_audio_callback(self, callback: Callable[[bytes], Coroutine[Any, Any, None]]) -> None:
"""Set a callback function to be called when audio is recorded."""
self.audio_callback = callback

# High-level methods for UI operations
def set_header_text(self, text: str) -> None:
"""Update the header text."""
header = self.query_one("#session-display", Header)
header.update(text)

def set_recording_status(self, is_recording: bool) -> None:
"""Set the recording status indicator."""
status_indicator = self.query_one(AudioStatusIndicator)
status_indicator.is_recording = is_recording

def log_message(self, message: str) -> None:
"""Add a message to the event log."""
try:
log_pane = self.query_one("#event-log-content", RichLog)
log_pane.write(message)
except Exception:
# Handle the case where the widget might not be available
pass

def add_transcript(self, message: str) -> None:
"""Add a transcript message to the transcripts panel."""
try:
transcript_pane = self.query_one("#transcripts-content", RichLog)
transcript_pane.write(message)
except Exception:
# Handle the case where the widget might not be available
pass

def play_audio(self, audio_data: npt.NDArray[np.int16]) -> None:
"""Play audio data through the audio player."""
try:
self.audio_player.write(audio_data)
except Exception as e:
self.log_message(f"Audio play error: {e}")

async def on_mount(self) -> None:
"""Set up audio player and start the audio capture worker."""
self.audio_player.start()
self.run_worker(self.capture_audio())

async def capture_audio(self) -> None:
"""Capture audio from the microphone and send to the session."""
# Wait for connection to be established
await self.connected.wait()

# Set up audio input stream
stream = sd.InputStream(
channels=CHANNELS,
samplerate=SAMPLE_RATE,
dtype=FORMAT,
)

try:
# Wait for user to press spacebar to start
await self.should_send_audio.wait()

stream.start()
self.set_recording_status(True)
self.log_message("Recording started - speak to the agent")

# Buffer size in samples
read_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)

while True:
# Check if there's enough data to read
if stream.read_available < read_size:
await asyncio.sleep(0.01) # Small sleep to avoid CPU hogging
continue

# Read audio data
data, _ = stream.read(read_size)

# Convert numpy array to bytes
audio_bytes = data.tobytes()

# Call audio callback if set
if self.audio_callback:
try:
await self.audio_callback(audio_bytes)
except Exception as e:
self.log_message(f"Audio callback error: {e}")

# Yield control back to event loop
await asyncio.sleep(0)

except Exception as e:
self.log_message(f"Audio capture error: {e}")
finally:
if stream.active:
stream.stop()
stream.close()

async def on_key(self, event: events.Key) -> None:
"""Handle key press events."""
# add the keypress to the log
self.log_message(f"Key pressed: {event.key}")

if event.key == "q":
self.audio_player.stop()
self.audio_player.close()
self.exit()
return

if event.key == "space": # Spacebar
if not self.should_send_audio.is_set():
self.should_send_audio.set()
self.set_recording_status(True)
4 changes: 3 additions & 1 deletion src/agents/realtime/openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ async def _handle_ws_event(self, event: dict[str, Any]):
).validate_python(event)
except Exception as e:
logger.error(f"Invalid event: {event} - {e}")
await self._emit_event(RealtimeTransportErrorEvent(error=f"Invalid event: {event}"))
await self._emit_event(
RealtimeTransportErrorEvent(error=f"Invalid event: {event} - {e}")
)
return

if parsed.type == "response.audio.delta":
Expand Down
Loading
Loading