Skip to content

Commit bc27593

Browse files
committed
fix(streaming): ensure processed_response.new_items, including HandoffCallItem, are pushed to event_queue
- Added `event_queue` parameter to `_get_single_step_result_from_response` - Push `processed_response.new_items` to queue in streaming mode - Non-streaming path (`_run_single_turn`) remains unchanged - Fixes loss of HandoffCallItem in streaming scenarios - Add test_stream_events_main_with_handoff test
1 parent 3556d7b commit bc27593

File tree

2 files changed

+81
-3
lines changed

2 files changed

+81
-3
lines changed

src/agents/run.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
from .models.multi_provider import MultiProvider
6161
from .result import RunResult, RunResultStreaming
6262
from .run_context import RunContextWrapper, TContext
63-
from .stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent, RunItemStreamEvent
63+
from .stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent, RunItemStreamEvent, StreamEvent
6464
from .tool import Tool
6565
from .tracing import Span, SpanError, agent_span, get_current_trace, trace
6666
from .tracing.span_data import AgentSpanData
@@ -1084,6 +1084,7 @@ async def _run_single_turn_streamed(
10841084
context_wrapper=context_wrapper,
10851085
run_config=run_config,
10861086
tool_use_tracker=tool_use_tracker,
1087+
event_queue=streamed_result._event_queue,
10871088
)
10881089

10891090
if emitted_tool_call_ids:
@@ -1196,6 +1197,7 @@ async def _get_single_step_result_from_response(
11961197
context_wrapper: RunContextWrapper[TContext],
11971198
run_config: RunConfig,
11981199
tool_use_tracker: AgentToolUseTracker,
1200+
event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] | None = None,
11991201
) -> SingleStepResult:
12001202
processed_response = RunImpl.process_model_response(
12011203
agent=agent,
@@ -1207,6 +1209,9 @@ async def _get_single_step_result_from_response(
12071209

12081210
tool_use_tracker.add_tool_use(agent, processed_response.tools_used)
12091211

1212+
if event_queue is not None and processed_response.new_items:
1213+
RunImpl.stream_step_items_to_queue(processed_response.new_items, event_queue)
1214+
12101215
return await RunImpl.execute_tools_and_side_effects(
12111216
agent=agent,
12121217
original_input=original_input,

tests/test_stream_events.py

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
import pytest
55

6-
from agents import Agent, Runner, function_tool
6+
from agents import Agent, Runner, function_tool, HandoffCallItem, HandoffOutputItem
77

88
from .fake_model import FakeModel
9-
from .test_responses import get_function_tool_call, get_text_message
9+
from .test_responses import get_function_tool_call, get_text_message, get_handoff_tool_call
1010

1111

1212
@function_tool
@@ -52,3 +52,76 @@ async def test_stream_events_main():
5252
assert tool_call_start_time > 0, "tool_call_item was not observed"
5353
assert tool_call_end_time > 0, "tool_call_output_item was not observed"
5454
assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?"
55+
56+
@pytest.mark.asyncio
57+
async def test_stream_events_main_with_handoff():
58+
@function_tool
59+
async def foo(args: str) -> str:
60+
return f"foo_result_{args}"
61+
62+
spanish_agent = Agent(
63+
name="SpanishAgent",
64+
instructions="You only speak Spanish.",
65+
model=FakeModel(),
66+
)
67+
68+
english_agent = Agent(
69+
name="EnglishAgent",
70+
instructions="You only speak English.",
71+
model=FakeModel(),
72+
)
73+
74+
model = FakeModel()
75+
model.add_multiple_turn_outputs(
76+
[
77+
[
78+
get_text_message("Hello"),
79+
get_function_tool_call("foo", '{"args": "arg1"}'),
80+
get_handoff_tool_call(spanish_agent),
81+
],
82+
[get_text_message("Done")],
83+
]
84+
)
85+
86+
triage_agent = Agent(
87+
name="TriageAgent",
88+
instructions="Handoff to the appropriate agent based on the language of the request.",
89+
handoffs=[spanish_agent, english_agent],
90+
tools=[foo],
91+
model=model,
92+
)
93+
94+
result = Runner.run_streamed(
95+
triage_agent,
96+
input="Start",
97+
)
98+
99+
tool_call_start_time = -1
100+
tool_call_end_time = -1
101+
handoff_requested_seen = False
102+
handoff_occured_seen = False
103+
agent_switched_to_spanish = False
104+
105+
async for event in result.stream_events():
106+
if event.type == "run_item_stream_event":
107+
if isinstance(event.item, HandoffCallItem):
108+
handoff_requested_seen = True
109+
elif isinstance(event.item, HandoffOutputItem):
110+
handoff_occured_seen = True
111+
elif event.item.type == "tool_call_item":
112+
tool_call_start_time = time.time_ns()
113+
elif event.item.type == "tool_call_output_item":
114+
tool_call_end_time = time.time_ns()
115+
elif event.type == "agent_updated_stream_event":
116+
if hasattr(event, 'new_agent') and event.new_agent.name == "SpanishAgent":
117+
agent_switched_to_spanish = True
118+
119+
120+
assert tool_call_start_time > 0, "tool_call_item was not observed"
121+
assert tool_call_end_time > 0, "tool_call_output_item was not observed"
122+
assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?"
123+
124+
assert handoff_requested_seen, "handoff_requested event not observed"
125+
assert handoff_occured_seen, "handoff_occured event not observed"
126+
127+
assert agent_switched_to_spanish, "Agent did not switch to SpanishAgent"

0 commit comments

Comments
 (0)