Skip to content

Realtime: move demo audio to separate thread #1141

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/realtime/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
self.ui.add_transcript("Audio ended")
elif event.type == "audio":
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
self.ui.play_audio(np_audio)
# Play audio in a separate thread to avoid blocking the event loop
await asyncio.to_thread(self.ui.play_audio, np_audio)
elif event.type == "audio_interrupted":
self.ui.add_transcript("Audio interrupted")
elif event.type == "error":
Expand Down
89 changes: 82 additions & 7 deletions examples/realtime/no_ui_demo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import asyncio
import queue
import sys
import threading
from typing import Any

import numpy as np
import sounddevice as sd
Expand Down Expand Up @@ -46,14 +49,77 @@ def __init__(self) -> None:
self.audio_player: sd.OutputStream | None = None
self.recording = False

# Audio output state for callback system
self.output_queue: queue.Queue[Any] = queue.Queue(maxsize=10) # Buffer more chunks
self.interrupt_event = threading.Event()
self.current_audio_chunk: np.ndarray | None = None # type: ignore
self.chunk_position = 0

def _output_callback(self, outdata, frames: int, time, status) -> None:
"""Callback for audio output - handles continuous audio stream from server."""
if status:
print(f"Output callback status: {status}")

# Check if we should clear the queue due to interrupt
if self.interrupt_event.is_set():
# Clear the queue and current chunk state
while not self.output_queue.empty():
try:
self.output_queue.get_nowait()
except queue.Empty:
break
self.current_audio_chunk = None
self.chunk_position = 0
self.interrupt_event.clear()
outdata.fill(0)
return

# Fill output buffer from queue and current chunk
outdata.fill(0) # Start with silence
samples_filled = 0

while samples_filled < len(outdata):
# If we don't have a current chunk, try to get one from queue
if self.current_audio_chunk is None:
try:
self.current_audio_chunk = self.output_queue.get_nowait()
self.chunk_position = 0
except queue.Empty:
# No more audio data available - this causes choppiness
# Uncomment next line to debug underruns:
# print(f"Audio underrun: {samples_filled}/{len(outdata)} samples filled")
break

# Copy data from current chunk to output buffer
remaining_output = len(outdata) - samples_filled
remaining_chunk = len(self.current_audio_chunk) - self.chunk_position
samples_to_copy = min(remaining_output, remaining_chunk)

if samples_to_copy > 0:
chunk_data = self.current_audio_chunk[
self.chunk_position : self.chunk_position + samples_to_copy
]
# More efficient: direct assignment for mono audio instead of reshape
outdata[samples_filled : samples_filled + samples_to_copy, 0] = chunk_data
samples_filled += samples_to_copy
self.chunk_position += samples_to_copy

# If we've used up the entire chunk, reset for next iteration
if self.chunk_position >= len(self.current_audio_chunk):
self.current_audio_chunk = None
self.chunk_position = 0

async def run(self) -> None:
print("Connecting, may take a few seconds...")

# Initialize audio player
# Initialize audio player with callback
chunk_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)
self.audio_player = sd.OutputStream(
channels=CHANNELS,
samplerate=SAMPLE_RATE,
dtype=FORMAT,
callback=self._output_callback,
blocksize=chunk_size, # Match our chunk timing for better alignment
)
self.audio_player.start()

Expand Down Expand Up @@ -146,15 +212,24 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
elif event.type == "audio_end":
print("Audio ended")
elif event.type == "audio":
# Play audio through speakers
# Enqueue audio for callback-based playback
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
if self.audio_player:
try:
self.audio_player.write(np_audio)
except Exception as e:
print(f"Audio playback error: {e}")
try:
self.output_queue.put_nowait(np_audio)
except queue.Full:
# Queue is full - only drop if we have significant backlog
# This prevents aggressive dropping that could cause choppiness
if self.output_queue.qsize() > 8: # Keep some buffer
try:
self.output_queue.get_nowait()
self.output_queue.put_nowait(np_audio)
except queue.Empty:
pass
# If queue isn't too full, just skip this chunk to avoid blocking
elif event.type == "audio_interrupted":
print("Audio interrupted")
# Signal the output callback to clear its queue and state
self.interrupt_event.set()
elif event.type == "error":
print(f"Error: {event.error}")
elif event.type == "history_updated":
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ requires-python = ">=3.9"
license = "MIT"
authors = [{ name = "OpenAI", email = "[email protected]" }]
dependencies = [
"openai>=1.96.0, <2",
"openai>=1.96.1, <2",
"pydantic>=2.10, <3",
"griffe>=1.5.6, <2",
"typing-extensions>=4.12.2, <5",
Expand Down
11 changes: 10 additions & 1 deletion src/agents/realtime/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ class AssistantText(BaseModel):
model_config = ConfigDict(extra="allow")


class AssistantAudio(BaseModel):
type: Literal["audio"] = "audio"
audio: str | None = None
transcript: str | None = None

# Allow extra data
model_config = ConfigDict(extra="allow")


class SystemMessageItem(BaseModel):
item_id: str
previous_item_id: str | None = None
Expand Down Expand Up @@ -58,7 +67,7 @@ class AssistantMessageItem(BaseModel):
type: Literal["message"] = "message"
role: Literal["assistant"] = "assistant"
status: Literal["in_progress", "completed", "incomplete"] | None = None
content: list[AssistantText]
content: list[Annotated[AssistantText | AssistantAudio, Field(discriminator="type")]]

# Allow extra data
model_config = ConfigDict(extra="allow")
Expand Down
4 changes: 3 additions & 1 deletion src/agents/realtime/openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,9 @@ async def _handle_output_item(self, item: ConversationItem) -> None:
"item_id": item.id or "",
"type": item.type,
"role": item.role,
"content": item.content,
"content": (
[content.model_dump() for content in item.content] if item.content else []
),
"status": "in_progress",
}
)
Expand Down
14 changes: 7 additions & 7 deletions tests/realtime/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ async def test_item_updated_event_updates_existing_item(self, mock_model, mock_a
# Check that item was updated
assert len(session._history) == 1
updated_item = cast(AssistantMessageItem, session._history[0])
assert updated_item.content[0].text == "Updated"
assert updated_item.content[0].text == "Updated" # type: ignore

# Should have 2 events: raw + history updated (not added)
assert session._event_queue.qsize() == 2
Expand Down Expand Up @@ -526,7 +526,7 @@ def test_update_existing_item_by_id(self):
# Item should be updated
result_item = cast(AssistantMessageItem, new_history[0])
assert result_item.item_id == "item_1"
assert result_item.content[0].text == "Updated"
assert result_item.content[0].text == "Updated" # type: ignore

def test_update_existing_item_preserves_order(self):
"""Test that updating existing item preserves its position in history"""
Expand Down Expand Up @@ -559,13 +559,13 @@ def test_update_existing_item_preserves_order(self):

# Middle item should be updated
updated_result = cast(AssistantMessageItem, new_history[1])
assert updated_result.content[0].text == "Updated Second"
assert updated_result.content[0].text == "Updated Second" # type: ignore

# Other items should be unchanged
item1_result = cast(AssistantMessageItem, new_history[0])
item3_result = cast(AssistantMessageItem, new_history[2])
assert item1_result.content[0].text == "First"
assert item3_result.content[0].text == "Third"
assert item1_result.content[0].text == "First" # type: ignore
assert item3_result.content[0].text == "Third" # type: ignore

def test_insert_new_item_after_previous_item(self):
"""Test inserting new item after specified previous_item_id"""
Expand Down Expand Up @@ -600,7 +600,7 @@ def test_insert_new_item_after_previous_item(self):

# Content should be correct
item2_result = cast(AssistantMessageItem, new_history[1])
assert item2_result.content[0].text == "Second"
assert item2_result.content[0].text == "Second" # type: ignore

def test_insert_new_item_after_nonexistent_previous_item(self):
"""Test that item with nonexistent previous_item_id gets added to end"""
Expand Down Expand Up @@ -703,7 +703,7 @@ def test_complex_insertion_scenario(self):
assert len(history) == 4
assert [item.item_id for item in history] == ["A", "B", "D", "C"]
itemB_result = cast(AssistantMessageItem, history[1])
assert itemB_result.content[0].text == "Updated B"
assert itemB_result.content[0].text == "Updated B" # type: ignore


# Test 3: Tool call execution flow (_handle_tool_call method)
Expand Down
8 changes: 4 additions & 4 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.