Skip to content

Conversation

@awsarron
Copy link
Member

@awsarron awsarron commented Nov 13, 2025

Description

Adds an A2AAgent class that makes it simple to consume remote A2A agents and invoke them like any other Strands Agents.

Implements the Protocol in #1126.

Example usage:

from strands import Agent, tool
from strands.agent.a2a_agent import A2AAgent

# Create an A2AAgent to connect to a remote A2A server
a2a_agent = A2AAgent(endpoint="http://localhost:9000")

# Expose the A2AAgent as a tool for our Strands agent to consume
@tool(name="calculator")
def a2a_agent_tool(prompt: str) -> str:
    """Invoke an agent that performs calculations"""
    return str(a2a_agent(prompt))

# Connect the second agent to our A2A agent
agent = Agent(tools=[a2a_agent_tool])

# Invoke the second agent
agent("Show me 10 ^ 6")

print(agent.messages)

Output:

I'll calculate 10^6 for you.
Tool #1: calculator
10^6 = 1,000,000

This represents 10 multiplied by itself 6 times, which equals one million.AgentResult(stop_reason='end_turn', message={'role': 'assistant', 'content': [{'text': '10^6 = 1,000,000\n\nThis represents 10 multiplied by itself 6 times, which equals one million.'}]}, metrics=EventLoopMetrics(cycle_count=2, tool_metrics={'calculator': ToolMetrics(tool={'toolUseId': 'tooluse_FV00wvpRRmK9mYNFo8ax-Q', 'name': 'calculator', 'input': {'prompt': '10^6'}}, call_count=1, success_count=1, error_count=0, total_time=2.747351884841919)}, cycle_durations=[1.717027187347412], traces=[<strands.telemetry.metrics.Trace object at 0x1123c4ec0>, <strands.telemetry.metrics.Trace object at 0x1124e76f0>], accumulated_usage={'inputTokens': 885, 'outputTokens': 99, 'totalTokens': 984}, accumulated_metrics={'latencyMs': 3482}), state={}, interrupts=None, structured_output=None)
>>> 
>>> print(agent.messages)
[{'role': 'user', 'content': [{'text': 'Show me 10 ^ 6'}]}, {'role': 'assistant', 'content': [{'text': "I'll calculate 10^6 for you."}, {'toolUse': {'toolUseId': 'tooluse_FV00wvpRRmK9mYNFo8ax-Q', 'name': 'calculator', 'input': {'prompt': '10^6'}}}]}, {'role': 'user', 'content': [{'toolResult': {'toolUseId': 'tooluse_FV00wvpRRmK9mYNFo8ax-Q', 'status': 'success', 'content': [{'text': '10^6 = 1,000,000\n\nThis represents 10 multiplied by itself 6 times, which equals one million.\n\n'}]}}]}, {'role': 'assistant', 'content': [{'text': '10^6 = 1,000,000\n\nThis represents 10 multiplied by itself 6 times, which equals one million.'}]}]

Standalone invocation of A2AAgent objects also works:

a2a_agent("Show me 10 ^ 6")

AgentResult(stop_reason='end_turn', message={'role': 'assistant', 'content': [{'text': '10^6 = 1,000,000 (one million)\n'}]}, metrics=EventLoopMetrics(cycle_count=0, tool_metrics={}, cycle_durations=[], traces=[], accumulated_usage={'inputTokens': 0, 'outputTokens': 0, 'totalTokens': 0}, accumulated_metrics={'latencyMs': 0}), state={}, interrupts=None, structured_output=None)

Follow-ups:

Related Issues

#907

Documentation PR

TODO

Type of Change

New feature

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • [TODO] I have updated the documentation accordingly
  • [TODO] I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@codecov
Copy link

codecov bot commented Nov 13, 2025

Codecov Report

❌ Patch coverage is 75.47170% with 39 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/strands/agent/a2a_agent.py 78.35% 15 Missing and 6 partials ⚠️
src/strands/multiagent/a2a/converters.py 67.27% 8 Missing and 10 partials ⚠️

📢 Thoughts on this report? Let us know!

mkmeral
mkmeral previously approved these changes Nov 14, 2025
"""
return run_async(lambda: self.invoke_async(prompt, **kwargs))

async def stream_async(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream async needs to return AgentResult for it to work with graph/swarm https://github.com/strands-agents/sdk-python/blob/main/src/strands/multiagent/graph.py#L810

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have this documented anywhere? Or is it only applicable internally?

@cagataycali
Copy link
Member

Can we also support parent agent's streaming support with callback_handler in a2a_client tool in strands-agents/tools?

RuntimeError: If no response received from agent.
"""
async for event in await self._send_message(prompt):
return convert_response_to_agent_result(event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that first event is the complete response though, right? What if it's not?

if not self.description and self._agent_card.description:
self.description = self._agent_card.description

logger.info("agent=<%s>, endpoint=<%s> | discovered agent card", self.name, self.endpoint)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be debug I think? I'm not sure we really use info anywhere throughout

if self._a2a_client is None:
agent_card = await self._get_agent_card()

if self._a2a_client_factory is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on simplifying to:

factory = self._a2a_client_factory or _create_default_factory()
self._a2a_client = factory.create(agent_card)

ValueError: If prompt is None.
RuntimeError: If no response received from agent.
"""
async for event in await self._send_message(prompt):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delegate this implementation to stream_async for the events?

if self._owns_client and self._httpx_client is not None:
try:
client = self._httpx_client
run_async(lambda: client.aclose())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you check to ensure that this works correctly? IIRC, @dbschmigelski indicated there were problems trying to do async stuff from del


if final_event is not None:
result = convert_response_to_agent_result(final_event)
yield AgentResultEvent(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to yield one final event that becomes the "result" of the tool: https://strandsagents.com/latest/documentation/docs/user-guide/concepts/tools/python-tools/#tool-streaming

I think as is, this results in a stringified version of AgentResultEvent - is that what we want?

a2a_agent = A2AAgent(endpoint="http://localhost:9000")

# Invoke our A2A server
result = a2a_agent("Hello there!")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have tests for all of the invoke/stream methods to ensure that they work e2e, when the client/server(s) are configured for streaming and non-streaming mode

A2AResponse: TypeAlias = tuple[Task, TaskStatusUpdateEvent | TaskArtifactUpdateEvent | None] | Message


class A2AStreamEvent(TypedEvent):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkmeral have we started exposing these events publicly yet?



class A2AStreamEvent(TypedEvent):
"""Event that wraps streamed A2A types."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this emitted for every single update that we get from the remote server? Even final updates etc?

If so, can we document that

from strands.agent.a2a_agent import A2AAgent


def test_a2a_agent():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there non-strands A2A servers that are easy to spin up? I wonder if we could/should test against those as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callout that I appreciate this PR description & code sample.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants