Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/agents/items.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import abc
import copy
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Generic, Literal, TypeVar, Union

Expand Down Expand Up @@ -277,7 +276,7 @@ def input_to_new_input_list(
"role": "user",
}
]
return copy.deepcopy(input)
return input.copy()

@classmethod
def text_message_outputs(cls, items: list[RunItem]) -> str:
Expand Down
27 changes: 25 additions & 2 deletions src/agents/realtime/openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@

_USER_AGENT = f"Agents/Python {__version__}"

# Conservative maximum raw audio bytes per websocket message before base64
# expansion and JSON wrapping, to stay well below typical 1 MiB frame limits.
_MAX_RAW_AUDIO_CHUNK_SIZE_BYTES = 256_000

DEFAULT_MODEL_SETTINGS: RealtimeSessionModelSettings = {
"voice": "ash",
"modalities": ["text", "audio"],
Expand Down Expand Up @@ -274,8 +278,27 @@ async def _send_user_input(self, event: RealtimeModelSendUserInput) -> None:
await self._send_raw_message(OpenAIResponseCreateEvent(type="response.create"))

async def _send_audio(self, event: RealtimeModelSendAudio) -> None:
converted = _ConversionHelper.convert_audio_to_input_audio_buffer_append(event)
await self._send_raw_message(converted)
audio_bytes = event.audio or b""

# Chunk large audio payloads to avoid exceeding WebSocket frame limits
# when base64-encoding into JSON messages.
if len(audio_bytes) <= _MAX_RAW_AUDIO_CHUNK_SIZE_BYTES:
converted = _ConversionHelper.convert_audio_to_input_audio_buffer_append(event)
await self._send_raw_message(converted)
else:
start_index = 0
total_length = len(audio_bytes)
while start_index < total_length:
end_index = min(start_index + _MAX_RAW_AUDIO_CHUNK_SIZE_BYTES, total_length)
chunk = audio_bytes[start_index:end_index]
start_index = end_index

chunk_event = RealtimeModelSendAudio(audio=chunk, commit=False)
converted = _ConversionHelper.convert_audio_to_input_audio_buffer_append(
chunk_event
)
await self._send_raw_message(converted)

if event.commit:
await self._send_raw_message(
OpenAIInputAudioBufferCommitEvent(type="input_audio_buffer.commit")
Expand Down
56 changes: 54 additions & 2 deletions src/agents/realtime/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ..agent import Agent
from ..exceptions import ModelBehaviorError, UserError
from ..handoffs import Handoff
from ..logger import logger
from ..run_context import RunContextWrapper, TContext
from ..tool import FunctionTool
from ..tool_context import ToolContext
Expand All @@ -33,7 +34,7 @@
RealtimeToolStart,
)
from .handoffs import realtime_handoff
from .items import InputAudio, InputText, RealtimeItem
from .items import AssistantAudio, InputAudio, InputText, RealtimeItem
from .model import RealtimeModel, RealtimeModelConfig, RealtimeModelListener
from .model_events import (
RealtimeModelEvent,
Expand Down Expand Up @@ -246,7 +247,58 @@ async def on_event(self, event: RealtimeModelEvent) -> None:
self._enqueue_guardrail_task(self._item_transcripts[item_id], event.response_id)
elif event.type == "item_updated":
is_new = not any(item.item_id == event.item.item_id for item in self._history)
self._history = self._get_new_history(self._history, event.item)

# Preserve previously known transcripts when updating existing items.
# This prevents transcripts from disappearing when an item is later
# retrieved without transcript fields populated.
incoming_item = event.item
existing_item = next(
(i for i in self._history if i.item_id == incoming_item.item_id), None
)

if (
existing_item is not None
and existing_item.type == "message"
and incoming_item.type == "message"
):
try:
# Merge transcripts for matching content indices
existing_content = existing_item.content
new_content = []
for idx, entry in enumerate(incoming_item.content):
# Only attempt to preserve for audio-like content
if entry.type in ("audio", "input_audio"):
# Use tuple form for Python 3.9 compatibility
assert isinstance(entry, (InputAudio, AssistantAudio))
# Determine if transcript is missing/empty on the incoming entry
entry_transcript = entry.transcript
if not entry_transcript:
preserved: str | None = None
# First prefer any transcript from the existing history item
if idx < len(existing_content):
this_content = existing_content[idx]
if isinstance(this_content, AssistantAudio) or isinstance(
this_content, InputAudio
):
preserved = this_content.transcript

# If still missing and this is an assistant item, fall back to
# accumulated transcript deltas tracked during the turn.
if not preserved and incoming_item.role == "assistant":
preserved = self._item_transcripts.get(incoming_item.item_id)

if preserved:
entry = entry.model_copy(update={"transcript": preserved})

new_content.append(entry)

if new_content:
incoming_item = incoming_item.model_copy(update={"content": new_content})
except Exception:
logger.error("Error merging transcripts", exc_info=True)
pass

self._history = self._get_new_history(self._history, incoming_item)
if is_new:
new_item = next(
item for item in self._history if item.item_id == event.item.item_id
Expand Down
17 changes: 11 additions & 6 deletions src/agents/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio
import copy
import inspect
from dataclasses import dataclass, field
from typing import Any, Callable, Generic, cast
Expand Down Expand Up @@ -387,7 +386,7 @@ async def run(
disabled=run_config.tracing_disabled,
):
current_turn = 0
original_input: str | list[TResponseInputItem] = copy.deepcopy(prepared_input)
original_input: str | list[TResponseInputItem] = _copy_str_or_list(prepared_input)
generated_items: list[RunItem] = []
model_responses: list[ModelResponse] = []

Expand Down Expand Up @@ -446,7 +445,7 @@ async def run(
starting_agent,
starting_agent.input_guardrails
+ (run_config.input_guardrails or []),
copy.deepcopy(prepared_input),
_copy_str_or_list(prepared_input),
context_wrapper,
),
self._run_single_turn(
Expand Down Expand Up @@ -594,7 +593,7 @@ def run_streamed(
)

streamed_result = RunResultStreaming(
input=copy.deepcopy(input),
input=_copy_str_or_list(input),
new_items=[],
current_agent=starting_agent,
raw_responses=[],
Expand Down Expand Up @@ -647,7 +646,7 @@ async def _maybe_filter_model_input(

try:
model_input = ModelInputData(
input=copy.deepcopy(effective_input),
input=effective_input.copy(),
instructions=effective_instructions,
)
filter_payload: CallModelData[TContext] = CallModelData(
Expand Down Expand Up @@ -786,7 +785,7 @@ async def _start_streaming(
cls._run_input_guardrails_with_queue(
starting_agent,
starting_agent.input_guardrails + (run_config.input_guardrails or []),
copy.deepcopy(ItemHelpers.input_to_new_input_list(prepared_input)),
ItemHelpers.input_to_new_input_list(prepared_input),
context_wrapper,
streamed_result,
current_span,
Expand Down Expand Up @@ -1376,3 +1375,9 @@ async def _save_result_to_session(


DEFAULT_AGENT_RUNNER = AgentRunner()


def _copy_str_or_list(input: str | list[TResponseInputItem]) -> str | list[TResponseInputItem]:
if isinstance(input, str):
return input
return input.copy()
62 changes: 62 additions & 0 deletions tests/realtime/test_openai_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,3 +396,65 @@ async def test_handle_audio_delta_state_management(self, model):
# Test that last audio item is tracked
last_item = model._audio_state_tracker.get_last_audio_item()
assert last_item == ("test_item", 5)


class TestAudioChunking(TestOpenAIRealtimeWebSocketModel):
"""Tests for chunking behavior when sending audio to avoid large WS frames."""

@pytest.mark.asyncio
async def test_send_audio_small_single_chunk_with_commit(self, model):
from agents.realtime.model_inputs import RealtimeModelSendAudio

# Use a small payload below the chunk threshold
small_audio = b"a" * 1024

with patch.object(model, "_send_raw_message") as mock_send_raw_message:
await model._send_audio(RealtimeModelSendAudio(audio=small_audio, commit=True))

# Should send append once and then commit once
assert mock_send_raw_message.call_count == 2
append_event = mock_send_raw_message.call_args_list[0].args[0]
commit_event = mock_send_raw_message.call_args_list[1].args[0]
assert getattr(append_event, "type", None) == "input_audio_buffer.append"
assert getattr(commit_event, "type", None) == "input_audio_buffer.commit"

@pytest.mark.asyncio
async def test_send_audio_is_chunked_and_committed(self, model):
from agents.realtime.model_inputs import RealtimeModelSendAudio
from agents.realtime.openai_realtime import _MAX_RAW_AUDIO_CHUNK_SIZE_BYTES

# Construct a payload that requires multiple chunks (2 full + 1 partial)
total_size = (_MAX_RAW_AUDIO_CHUNK_SIZE_BYTES * 2) + (_MAX_RAW_AUDIO_CHUNK_SIZE_BYTES // 2)
audio_bytes = b"b" * total_size

with patch.object(model, "_send_raw_message") as mock_send_raw_message:
await model._send_audio(RealtimeModelSendAudio(audio=audio_bytes, commit=True))

# Expect 3 append events + 1 commit event
assert mock_send_raw_message.call_count == 4

# All but last should be append events
for call in mock_send_raw_message.call_args_list[:-1]:
event = call.args[0]
assert getattr(event, "type", None) == "input_audio_buffer.append"

# Last should be commit event
last_event = mock_send_raw_message.call_args_list[-1].args[0]
assert getattr(last_event, "type", None) == "input_audio_buffer.commit"

@pytest.mark.asyncio
async def test_send_audio_chunked_without_commit(self, model):
from agents.realtime.model_inputs import RealtimeModelSendAudio
from agents.realtime.openai_realtime import _MAX_RAW_AUDIO_CHUNK_SIZE_BYTES

total_size = (_MAX_RAW_AUDIO_CHUNK_SIZE_BYTES * 2) + 10
audio_bytes = b"c" * total_size

with patch.object(model, "_send_raw_message") as mock_send_raw_message:
await model._send_audio(RealtimeModelSendAudio(audio=audio_bytes, commit=False))

# Expect only append events (no commit)
assert mock_send_raw_message.call_count == 3
for call in mock_send_raw_message.call_args_list:
event = call.args[0]
assert getattr(event, "type", None) == "input_audio_buffer.append"
63 changes: 63 additions & 0 deletions tests/realtime/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
RealtimeToolStart,
)
from agents.realtime.items import (
AssistantAudio,
AssistantMessageItem,
AssistantText,
InputAudio,
Expand Down Expand Up @@ -1625,3 +1626,65 @@ async def test_update_agent_creates_handoff_and_session_update_event(self, mock_

# Check that the current agent and session settings are updated
assert session._current_agent == second_agent


class TestTranscriptPreservation:
"""Tests ensuring assistant transcripts are preserved across updates."""

@pytest.mark.asyncio
async def test_assistant_transcript_preserved_on_item_update(self, mock_model, mock_agent):
session = RealtimeSession(mock_model, mock_agent, None)

# Initial assistant message with audio transcript present (e.g., from first turn)
initial_item = AssistantMessageItem(
item_id="assist_1",
role="assistant",
content=[AssistantAudio(audio=None, transcript="Hello there")],
)
session._history = [initial_item]

# Later, the platform retrieves/updates the same item but without transcript populated
updated_without_transcript = AssistantMessageItem(
item_id="assist_1",
role="assistant",
content=[AssistantAudio(audio=None, transcript=None)],
)

await session.on_event(RealtimeModelItemUpdatedEvent(item=updated_without_transcript))

# Transcript should be preserved from existing history
assert len(session._history) == 1
preserved_item = cast(AssistantMessageItem, session._history[0])
assert isinstance(preserved_item.content[0], AssistantAudio)
assert preserved_item.content[0].transcript == "Hello there"

@pytest.mark.asyncio
async def test_assistant_transcript_can_fallback_to_deltas(self, mock_model, mock_agent):
session = RealtimeSession(mock_model, mock_agent, None)

# Simulate transcript deltas accumulated for an assistant item during generation
await session.on_event(
RealtimeModelTranscriptDeltaEvent(
item_id="assist_2", delta="partial transcript", response_id="resp_2"
)
)

# Add initial assistant message without transcript
initial_item = AssistantMessageItem(
item_id="assist_2",
role="assistant",
content=[AssistantAudio(audio=None, transcript=None)],
)
await session.on_event(RealtimeModelItemUpdatedEvent(item=initial_item))

# Later update still lacks transcript; merge should fallback to accumulated deltas
update_again = AssistantMessageItem(
item_id="assist_2",
role="assistant",
content=[AssistantAudio(audio=None, transcript=None)],
)
await session.on_event(RealtimeModelItemUpdatedEvent(item=update_again))

preserved_item = cast(AssistantMessageItem, session._history[0])
assert isinstance(preserved_item.content[0], AssistantAudio)
assert preserved_item.content[0].transcript == "partial transcript"
35 changes: 35 additions & 0 deletions tests/test_items_helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import json

from openai.types.responses.response_computer_tool_call import (
ActionScreenshot,
ResponseComputerToolCall,
Expand All @@ -20,8 +22,10 @@
from openai.types.responses.response_output_message_param import ResponseOutputMessageParam
from openai.types.responses.response_output_refusal import ResponseOutputRefusal
from openai.types.responses.response_output_text import ResponseOutputText
from openai.types.responses.response_output_text_param import ResponseOutputTextParam
from openai.types.responses.response_reasoning_item import ResponseReasoningItem, Summary
from openai.types.responses.response_reasoning_item_param import ResponseReasoningItemParam
from pydantic import TypeAdapter

from agents import (
Agent,
Expand Down Expand Up @@ -290,3 +294,34 @@ def test_to_input_items_for_reasoning() -> None:
print(converted_dict)
print(expected)
assert converted_dict == expected


def test_input_to_new_input_list_copies_the_ones_produced_by_pydantic() -> None:
# Given a list of message dictionaries, ensure the returned list is a deep copy.
original = ResponseOutputMessageParam(
id="a75654dc-7492-4d1c-bce0-89e8312fbdd7",
content=[
ResponseOutputTextParam(
type="output_text",
text="Hey, what's up?",
annotations=[],
)
],
role="assistant",
status="completed",
type="message",
)
original_json = json.dumps(original)
output_item = TypeAdapter(ResponseOutputMessageParam).validate_json(original_json)
new_list = ItemHelpers.input_to_new_input_list([output_item])
assert len(new_list) == 1
assert new_list[0]["id"] == original["id"] # type: ignore
size = 0
for i, item in enumerate(original["content"]):
size += 1 # pydantic_core._pydantic_core.ValidatorIterator does not support len()
assert item["type"] == original["content"][i]["type"] # type: ignore
assert item["text"] == original["content"][i]["text"] # type: ignore
assert size == 1
assert new_list[0]["role"] == original["role"] # type: ignore
assert new_list[0]["status"] == original["status"] # type: ignore
assert new_list[0]["type"] == original["type"]