diff --git a/src/strands/multiagent/a2a/server.py b/src/strands/multiagent/a2a/server.py index fa7b6b887..35ea5b2e3 100644 --- a/src/strands/multiagent/a2a/server.py +++ b/src/strands/multiagent/a2a/server.py @@ -10,8 +10,9 @@ import uvicorn from a2a.server.apps import A2AFastAPIApplication, A2AStarletteApplication +from a2a.server.events import QueueManager from a2a.server.request_handlers import DefaultRequestHandler -from a2a.server.tasks import InMemoryTaskStore +from a2a.server.tasks import InMemoryTaskStore, PushNotificationConfigStore, PushNotificationSender, TaskStore from a2a.types import AgentCapabilities, AgentCard, AgentSkill from fastapi import FastAPI from starlette.applications import Starlette @@ -36,6 +37,12 @@ def __init__( serve_at_root: bool = False, version: str = "0.0.1", skills: list[AgentSkill] | None = None, + # RequestHandler + task_store: TaskStore | None = None, + queue_manager: QueueManager | None = None, + push_config_store: PushNotificationConfigStore | None = None, + push_sender: PushNotificationSender | None = None, + ): """Initialize an A2A-compatible server from a Strands agent. @@ -52,6 +59,14 @@ def __init__( Defaults to False. version: The version of the agent. Defaults to "0.0.1". skills: The list of capabilities or functions the agent can perform. + task_store: Custom task store implementation for managing agent tasks. If None, + uses InMemoryTaskStore. + queue_manager: Custom queue manager for handling message queues. If None, + no queue management is used. + push_config_store: Custom store for push notification configurations. If None, + no push notification configuration is used. + push_sender: Custom push notification sender implementation. If None, + no push notifications are sent. """ self.host = host self.port = port @@ -77,7 +92,10 @@ def __init__( self.capabilities = AgentCapabilities(streaming=True) self.request_handler = DefaultRequestHandler( agent_executor=StrandsA2AExecutor(self.strands_agent), - task_store=InMemoryTaskStore(), + task_store=task_store or InMemoryTaskStore(), + queue_manager=queue_manager, + push_config_store=push_config_store, + push_sender=push_sender, ) self._agent_skills = skills logger.info("Strands' integration with A2A is experimental. Be aware of frequent breaking changes.")