Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/strands/multiagent/a2a/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

import uvicorn
from a2a.server.apps import A2AFastAPIApplication, A2AStarletteApplication
from a2a.server.events import QueueManager
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.tasks import InMemoryTaskStore, PushNotificationConfigStore, PushNotificationSender, TaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from fastapi import FastAPI
from starlette.applications import Starlette
Expand All @@ -36,6 +37,12 @@ def __init__(
serve_at_root: bool = False,
version: str = "0.0.1",
skills: list[AgentSkill] | None = None,
# RequestHandler
task_store: TaskStore | None = None,
queue_manager: QueueManager | None = None,
push_config_store: PushNotificationConfigStore | None = None,
push_sender: PushNotificationSender | None = None,

):
"""Initialize an A2A-compatible server from a Strands agent.

Expand All @@ -52,6 +59,14 @@ def __init__(
Defaults to False.
version: The version of the agent. Defaults to "0.0.1".
skills: The list of capabilities or functions the agent can perform.
task_store: Custom task store implementation for managing agent tasks. If None,
uses InMemoryTaskStore.
queue_manager: Custom queue manager for handling message queues. If None,
no queue management is used.
push_config_store: Custom store for push notification configurations. If None,
no push notification configuration is used.
push_sender: Custom push notification sender implementation. If None,
no push notifications are sent.
"""
self.host = host
self.port = port
Expand All @@ -77,7 +92,10 @@ def __init__(
self.capabilities = AgentCapabilities(streaming=True)
self.request_handler = DefaultRequestHandler(
agent_executor=StrandsA2AExecutor(self.strands_agent),
task_store=InMemoryTaskStore(),
task_store=task_store or InMemoryTaskStore(),
queue_manager=queue_manager,
push_config_store=push_config_store,
push_sender=push_sender,
)
self._agent_skills = skills
logger.info("Strands' integration with A2A is experimental. Be aware of frequent breaking changes.")
Expand Down
Loading