Skip to content

Commit a5138d1

Browse files
committed
fix(streaming): ensure processed_response.new_items, including HandoffCallItem, are pushed to event_queue
- Added `event_queue` parameter to `_get_single_step_result_from_response` - Push `processed_response.new_items` to queue in streaming mode - Non-streaming path (`_run_single_turn`) remains unchanged - Fixes loss of HandoffCallItem in streaming scenarios - Add test_stream_events_main_with_handoff test
1 parent 3556d7b commit a5138d1

File tree

2 files changed

+72
-4
lines changed

2 files changed

+72
-4
lines changed

src/agents/run.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
from .models.multi_provider import MultiProvider
6161
from .result import RunResult, RunResultStreaming
6262
from .run_context import RunContextWrapper, TContext
63-
from .stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent, RunItemStreamEvent
63+
from .stream_events import AgentUpdatedStreamEvent, RawResponsesStreamEvent, RunItemStreamEvent, StreamEvent
6464
from .tool import Tool
6565
from .tracing import Span, SpanError, agent_span, get_current_trace, trace
6666
from .tracing.span_data import AgentSpanData
@@ -1084,6 +1084,7 @@ async def _run_single_turn_streamed(
10841084
context_wrapper=context_wrapper,
10851085
run_config=run_config,
10861086
tool_use_tracker=tool_use_tracker,
1087+
event_queue=streamed_result._event_queue,
10871088
)
10881089

10891090
if emitted_tool_call_ids:
@@ -1196,6 +1197,7 @@ async def _get_single_step_result_from_response(
11961197
context_wrapper: RunContextWrapper[TContext],
11971198
run_config: RunConfig,
11981199
tool_use_tracker: AgentToolUseTracker,
1200+
event_queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel] | None = None,
11991201
) -> SingleStepResult:
12001202
processed_response = RunImpl.process_model_response(
12011203
agent=agent,
@@ -1207,6 +1209,9 @@ async def _get_single_step_result_from_response(
12071209

12081210
tool_use_tracker.add_tool_use(agent, processed_response.tools_used)
12091211

1212+
if event_queue is not None and processed_response.new_items:
1213+
RunImpl.stream_step_items_to_queue(processed_response.new_items, event_queue)
1214+
12101215
return await RunImpl.execute_tools_and_side_effects(
12111216
agent=agent,
12121217
original_input=original_input,

tests/test_stream_events.py

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33

44
import pytest
55

6-
from agents import Agent, Runner, function_tool
6+
from agents import Agent, Runner, function_tool, HandoffCallItem, HandoffOutputItem
7+
from agents.handoffs import handoff
8+
from agents.extensions.handoff_filters import remove_all_tools
79

810
from .fake_model import FakeModel
9-
from .test_responses import get_function_tool_call, get_text_message
10-
11+
from .test_responses import get_function_tool_call, get_text_message, get_handoff_tool_call
1112

1213
@function_tool
1314
async def foo() -> str:
@@ -52,3 +53,65 @@ async def test_stream_events_main():
5253
assert tool_call_start_time > 0, "tool_call_item was not observed"
5354
assert tool_call_end_time > 0, "tool_call_output_item was not observed"
5455
assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?"
56+
57+
@pytest.mark.asyncio
58+
async def test_stream_events_main_with_handoff():
59+
@function_tool
60+
async def foo(args: str) -> str:
61+
return f"foo_result_{args}"
62+
63+
english_agent = Agent(
64+
name="EnglishAgent",
65+
instructions="You only speak English.",
66+
model=FakeModel(),
67+
)
68+
69+
model = FakeModel()
70+
model.add_multiple_turn_outputs(
71+
[
72+
[
73+
get_text_message("Hello"),
74+
get_function_tool_call("foo", '{"args": "arg1"}'),
75+
get_handoff_tool_call(english_agent),
76+
],
77+
[get_text_message("Done")],
78+
]
79+
)
80+
81+
triage_agent = Agent(
82+
name="TriageAgent",
83+
instructions="Handoff to the appropriate agent based on the language of the request.",
84+
handoffs=[
85+
handoff(english_agent, input_filter=remove_all_tools),
86+
],
87+
tools=[foo],
88+
model=model,
89+
)
90+
91+
result = Runner.run_streamed(
92+
triage_agent,
93+
input="Start",
94+
)
95+
96+
tool_call_start_time = -1
97+
tool_call_end_time = -1
98+
handoff_requested_seen = False
99+
handoff_occured_seen = False
100+
agent_switched_to_english = False
101+
102+
async for event in result.stream_events():
103+
if event.type == "run_item_stream_event":
104+
if isinstance(event.item, HandoffCallItem):
105+
handoff_requested_seen = True
106+
elif isinstance(event.item, HandoffOutputItem):
107+
handoff_occured_seen = True
108+
elif event.item.type == "tool_call_item":
109+
tool_call_start_time = time.time_ns()
110+
elif event.item.type == "tool_call_output_item":
111+
tool_call_end_time = time.time_ns()
112+
elif event.type == "agent_updated_stream_event":
113+
if hasattr(event, 'new_agent') and event.new_agent.name == "EnglishAgent":
114+
agent_switched_to_english = True
115+
116+
assert handoff_requested_seen, "handoff_requested event not observed"
117+
assert agent_switched_to_english, "Agent did not switch to EnglishAgent"

0 commit comments

Comments
 (0)