Skip to content

Conversation

@mehtarac
Copy link
Owner

@mehtarac mehtarac commented Oct 21, 2025

Bidirectional Event Loop Implementation Comparison

Overview

This document compares the original bidirectional event loop implementation (bidirectional_event_loop.py) with a new class-based implementation architectural changes.

Original Implementation Analysis

Architecture Overview

The original implementation combines a wrapper class (BidirectionalConnection) with standalone functions for processing. The wrapper class coordinates multiple standalone processing functions that require session parameter passing.

Core Components

BidirectionalConnection Class

  • Acts as a session wrapper that coordinates background tasks
  • Stores model session, agent reference, and control flags
  • Manages task collections: background_tasks, pending_tool_tasks
  • Contains queues: tool_queue for tool requests, audio_output_queue for audio events
  • Provides interruption handling with interrupted flag and interruption_lock

Key Function: Central coordinator object that holds all session state and manages communication between separate processing functions.

Coordinator Functions

  • start_bidirectional_connection(): Creates model session and initializes background tasks
  • stop_bidirectional_connection(): Terminates session and cleans up resources
  • bidirectional_event_loop_cycle(): Main supervision loop that monitors background tasks

Key Function: Interface layer that creates and manages the session wrapper, serving as the public API for the event loop system.

Background Processing Functions

  • _process_model_events(): Handles incoming model events and forwards them appropriately
  • _process_tool_execution(): Polls tool queue with 0.5-second timeout and executes tools
  • _handle_interruption(): Cancels tool tasks and clears audio buffers
  • _execute_tool_with_strands(): Integrates with Strands tool execution system

Key Function: Specialized processors that handle specific aspects of bidirectional streaming, each operating as independent functions requiring session parameter.

Task Management

The original implementation runs three primary background tasks:

  • Model event processor
  • Tool execution processor with queue polling
  • Main coordination cycle for supervision

Task Architecture: Three-task system where each background process handles a specific responsibility, requiring coordination through shared session state.

Tool Execution Flow

Tool execution follows a queue-based pattern:

  1. Model events processor detects tool requests
  2. Tool requests are placed in tool_queue
  3. Tool execution processor polls queue with 0.5-second timeout
  4. Tools execute after polling delay

Performance Impact: The queue polling introduces 0-500ms delay before tool execution begins, creating noticeable latency in real-time interactions.

Constants and Configuration

The implementation defines two key constants:

  • TOOL_QUEUE_TIMEOUT = 0.5: Maximum wait time for tool queue polling
  • SUPERVISION_INTERVAL = 0.1: Sleep interval for main coordination loop

Performance Constraint: These constants directly impact system responsiveness, with tool execution delayed by up to 500ms and task failures detected within 100ms.

New Implementation Analysis

Architecture Overview

The new implementation uses a class-based architecture where all functionality is encapsulated within the BidirectionalEventLoop class. The single class encapsulation eliminates parameter passing overhead and provides cleaner state management.

Core Components

BidirectionalEventLoop Class

  • Single class that manages entire event loop lifecycle
  • Contains same core dependencies: model session and agent reference
  • Manages task tracking with background_tasks and pending_tool_tasks
  • Implements synchronization with multiple locks:
    • interruption_lock: Atomic interruption processing
    • conversation_lock: Thread-safe conversation history updates
  • Provides dedicated audio queue and metrics tracking
  • Tool execution protection: prevents interruption of active tools

Key Advantage: All related functionality contained within single class, eliminating the need for parameter passing and providing cleaner encapsulation.

Class Methods

  • start(): Initializes background tasks
  • stop(): Graceful shutdown and resource cleanup
  • schedule_tool_execution(): Immediate tool task creation
  • handle_interruption(): Atomic interruption processing
  • _process_model_events(): Event stream processing
  • _supervise_session(): Background task health monitoring
  • _execute_tool_with_strands(): Tool execution with Strands integration

Method Design: Each method has single responsibility and operates on class state without requiring external parameter passing.

Coordinator Functions

  • start_bidirectional_connection(): Creates event loop instance
  • stop_bidirectional_connection(): Delegates to event loop stop method

Interface Preservation: Same public API as original implementation.

Task Management

The new implementation runs two background tasks:

  • Model event processor (combines event processing and tool scheduling)
  • Session supervisor with deterministic task monitoring

Task Consolidation: Reduces from 3 to 2 background tasks by combining event processing with immediate tool scheduling, eliminating the separate polling task.

Tool Execution Flow

Tool execution uses immediate scheduling:

  1. Model events processor detects tool requests
  2. schedule_tool_execution() creates asyncio task immediately
  3. Tools execute with 0ms scheduling delay

Performance Improvement: Eliminates queue polling entirely, reducing tool execution latency from 0-500ms to 0ms for real-time responsiveness.

Supervision Mechanism

The new implementation replaces polling-based supervision with event-driven monitoring using asyncio.wait() with FIRST_COMPLETED return condition and 1.0-second timeout for periodic active flag checks. This provides immediate task failure detection (0ms delay) versus the original 100ms polling intervals.

Deterministic Behavior: Event-driven supervision responds immediately to task failures rather than waiting for next polling cycle, improving system reliability.

Component Comparison

Session Initialization

Original Approach

# Creates wrapper class and separate background task functions
session = BidirectionalConnection(model_session=model_session, agent=agent)
session.background_tasks = [
    asyncio.create_task(_process_model_events(session)),
    asyncio.create_task(_process_tool_execution(session)),
]
session.main_cycle_task = asyncio.create_task(bidirectional_event_loop_cycle(session))

New Approach

# Creates single event loop class with encapsulated methods
event_loop = BidirectionalEventLoop(model_session=model_session, agent=agent)
self.background_tasks = [
    asyncio.create_task(self._process_model_events()),
    asyncio.create_task(self._supervise_session()),
]

Functional Equivalence: Both approaches create the necessary background tasks and initialize session state. The new approach eliminates parameter passing overhead.

Tool Execution Scheduling

Original Approach

# Queue-based polling with timeout
async def _process_tool_execution(session: BidirectionalConnection) -> None:
    while session.active:
        try:
            tool_use = await asyncio.wait_for(session.tool_queue.get(), timeout=TOOL_QUEUE_TIMEOUT)
            # Execute tool after 0-500ms delay

New Approach

# Immediate task creation
def schedule_tool_execution(self, tool_use: ToolUse) -> None:
    task = asyncio.create_task(self._execute_tool_with_strands(tool_use))
    # Tool executes immediately

Functional Equivalence: Both approaches execute tools concurrently. The new approach eliminates polling delays, reducing tool execution latency from 0-500ms to 0ms.

Model Event Processing

Original Implementation

async def _process_model_events(session: BidirectionalConnection) -> None:
    async for provider_event in session.model_session.receive_events():
        # Event processing and forwarding
        if strands_event.get("toolUse"):
            await session.tool_queue.put(strands_event["toolUse"])  # Queue for later

New Implementation

async def _process_model_events(self) -> None:
    async for provider_event in self.model_session.receive_events():
        # Event processing and forwarding
        if strands_event.get("toolUse"):
            self.schedule_tool_execution(strands_event["toolUse"])  # Execute immediately

Functional Equivalence: Both approaches process the same event types and forward events to the agent. The new approach schedules tools immediately instead of queuing them.
Advantage: Immediate tool scheduling eliminates the queue bottleneck that was causing delays in tool execution.

Interruption Handling

Original Implementation

async def _handle_interruption(session: BidirectionalConnection) -> None:
    async with session.interruption_lock:
        if session.interrupted:
            return
        session.interrupted = True
        # Cancel all pending tool tasks
        for task_id, task in list(session.pending_tool_tasks.items()):
            if not task.done():
                task.cancel()
        # Clear audio queues

New Implementation

async def handle_interruption(self) -> None:
    async with self.interruption_lock:
        if self.interrupted:
            return
        # Check if tools are currently executing
        active_tool_tasks = [task for task in self.pending_tool_tasks.values() if not task.done()]
        if active_tool_tasks:
            # Don't cancel tools, but still clear audio for responsive interruption
        else:
            # Full interruption handling
        # Always clear audio queues for responsive interruption

Functional Enhancement: The new implementation adds tool execution protection. When tools are actively running, audio is cleared for responsiveness but tools continue executing to completion.

Conversation History Management

Original Implementation

# Direct message appending without synchronization
session.agent.messages.append(strands_event["messageStop"]["message"])
session.agent.messages.append(tool_result_message)

New Implementation

# Thread-safe message appending with conversation lock
async with self.conversation_lock:
    self.agent.messages.append(strands_event["messageStop"]["message"])
    self.agent.messages.append(tool_result_message)

Functional Equivalence: Both implementations maintain conversation history. The new approach adds race condition protection for concurrent tool execution scenarios.
Concurrency Safety: Thread-safe operations prevent conversation history overwriting when multiple tools execute simultaneously.

Background Task Supervision

Original Implementation

# Polling-based supervision with sleep intervals
async def bidirectional_event_loop_cycle(session: BidirectionalConnection) -> None:
    while session.active:
        # Check background task health
        await asyncio.sleep(SUPERVISION_INTERVAL)  # 0.1s polling

New Implementation

# Event-driven supervision with immediate failure detection
async def _supervise_session(self) -> None:
    while self.active and tasks_to_supervise:
        done, pending = await asyncio.wait(
            tasks_to_supervise,
            return_when=asyncio.FIRST_COMPLETED,  # Immediate response
            timeout=1.0
        )

Functional Equivalence: Both approaches monitor background task health and trigger cleanup on failures. The new Event-driven supervision provides deterministic failure detection instead of polling-based monitoring.

Tool Execution Integration

Original Implementation

async def _execute_tool_with_strands(session: BidirectionalConnection, tool_use: dict) -> None:
    # Strands tool validation and execution
    tool_events = session.agent.tool_executor._execute(...)
    # Process results and update conversation history
    session.agent.messages.append(tool_result_message)

New Implementation

async def _execute_tool_with_strands(self, tool_use: ToolUse) -> None:
    # Identical Strands tool validation and execution
    tool_events = self.agent.tool_executor._execute(...)
    # Process results and update conversation history with thread safety
    async with self.conversation_lock:
        self.agent.messages.append(tool_result_message)

Functional Equivalence: Both implementations use identical Strands tool execution pipelines with validation, event processing. The new approach adds thread-safe conversation history updates.
Integration Preserved: Maintains complete compatibility with existing Strands tool system while adding safety improvements.

Resource Cleanup

Original Implementation

async def stop_bidirectional_connection(session: BidirectionalConnection) -> None:
    session.active = False
    # Cancel all task types: pending tools, background tasks, main cycle
    all_tasks = session.background_tasks + list(session.pending_tool_tasks.values())
    if hasattr(session, "main_cycle_task"):
        all_tasks.append(session.main_cycle_task)
    await asyncio.gather(*all_tasks, return_exceptions=True)

New Implementation

async def stop(self) -> None:
    self.active = False
    # Cancel all task types: pending tools, background tasks
    all_tasks = list(self.pending_tool_tasks.values()) + self.background_tasks
    await asyncio.gather(*all_tasks, return_exceptions=True)

Functional Equivalence: Both implementations cancel all active tasks and wait for completion. The new approach eliminates the separate main cycle task while maintaining the same cleanup.
Resource Management: Simplified cleanup process with fewer task types to manage, reducing complexity while maintaining complete resource cleanup.

Coordinates background tasks for model event processing, tool execution, and audio
handling while providing a simple interface for agent interactions.
class BidirectionalEventLoop:
Copy link
Owner Author

@mehtarac mehtarac Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming is just a suggestion and open to discussion

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Agent loop, not event loop

# Synchronization primitives
self.interrupted = False
self.interruption_lock = asyncio.Lock()
self.conversation_lock = asyncio.Lock() # Race condition prevention
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock is used when adding to conversation history. Using a lock here to ensure the conversation history is not corrupted or overwritten. For example if multiple tools complete at the same time, then a lock will help ensure that all the results are written to the conversation history

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does order of writing tool results matter? I guess tools will execute as they appear, and the model will continue to stream its response.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically speaking we shouldn't need a lock for the following reasons:

  1. async tasks run one at a time but in a cooperative manner. But because they run one at a time, their is no concern of resource conflicts when operating in memory.
  2. The messages array is first updated in memory, which can only be altered by one task at a time.
  3. The session managers operate in hooks, which run synchronously. Assuming hooks could run asynchronously, I would say it is up to them to place in locks.

"""Start background tasks for model event processing and session supervision."""
logger.debug("Starting bidirectional event loop")

self.background_tasks = [
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the self.background_tasks is initialized as an empty list in the constructor and populated in this method on purpose. This helps with error handling and also ensuring that the event loop instance is initialized before we create tasks.

# Thread-safe counter increment
current_tool_number = self.tool_count + 1
self.tool_count = current_tool_number
print(f"\nTool #{current_tool_number}: {tool_name}")
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to PrintingCallbackHandler in sdk-python/src/strands/handlers/callback_handler.py

break

# Remove completed tasks from supervision list
tasks_to_supervise = [task for task in tasks_to_supervise if not task.done()]
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case the only task being supervised will be the _process_model_events task in normal connection/session and thus tasks_to_supervise list remains unchanged - no tasks removed. However, The removal logic exists as a safety net in case the model events task unexpectedly completes (network failure) and we need graceful shutdown.

return event_loop


async def stop_bidirectional_connection(event_loop: "BidirectionalEventLoop") -> None:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a wrapper that calls event_loop.stop(). Mainly to preserve existing api of agent.start_bidirectional_connection and agent.stop_bidirectional_connection in the agent class.


# Cancel all tasks
for task in self.pending_tool_tasks.values():
if not task.done():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

under what curcermentance, task will be done but still in this pending_tool_tasks

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the check here is checking if a task is not done and cancels it. So if a user triggers a multiple tool calls and then closes the bidirectional session, then we cancel all the pending tool tasks and clear them.

# Thread-safe counter increment
current_tool_number = self.tool_count + 1
self.tool_count = current_tool_number
print(f"\nTool #{current_tool_number}: {tool_name}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to understand why we need print here.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to replicate and ensure that a tool was being called/used by the model similar to how we use PintingCallbackHandler in existing strands system. Another similar comment here: #10 (comment)

@mehtarac mehtarac self-assigned this Oct 21, 2025
Coordinates background tasks for model event processing, tool execution, and audio
handling while providing a simple interface for agent interactions.
class BidirectionalEventLoop:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Agent loop, not event loop

# Synchronization primitives
self.interrupted = False
self.interruption_lock = asyncio.Lock()
self.conversation_lock = asyncio.Lock() # Race condition prevention
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does order of writing tool results matter? I guess tools will execute as they appear, and the model will continue to stream its response.

}
while self.active and tasks_to_supervise:
# Wait for any task completion (deterministic vs polling)
done, pending = await asyncio.wait(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this supervisor? Can we design with with no supervision?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially use asyncio.gather().
Using asyncio.gather():

  • Start all tasks with gather()
  • If any task fails, gather() completes
  • This is basically supervision built into asyncio

Although we could use asyncio.gather(), the current approach has light overhead and still provides a event based mechanism.

while maintaining a simple interface for agent interaction.
Features:
- Concurrent task management for model events and tool execution
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_process_tool_execution I saw this function is removed but mentioned in doc.


# Tool execution tracking
# Audio and metrics
self.audio_output_queue = asyncio.Queue()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these bidirectional models can stream more than just audio data, I would avoid creating an audio specific queue. We should instead try to create a data agnostic queue.

tool_name = tool_use.get("name")
tool_id = tool_use.get("toolUseId")

# Thread-safe counter increment
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using threads?

tool_use_id = tool_result.get("toolUseId")
await self.model_session.send_tool_result(tool_use_id, tool_result)
logger.debug("Tool result sent: %s", tool_use_id)
elif isinstance(tool_event, ToolStreamEvent):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious how we deal with ToolStreamEvent

finally:
logger.debug("Session supervisor stopped")

async def _execute_tool_with_strands(self, tool_use: ToolUse) -> None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we just call this _execute_tool?



# Session lifecycle coordinator functions
async def start_bidirectional_connection(agent: "BidirectionalAgent") -> "BidirectionalEventLoop":
Copy link

@pgrayy pgrayy Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we make this a @classmethod on BidirectionalEventLoop.

return event_loop


async def stop_bidirectional_connection(event_loop: "BidirectionalEventLoop") -> None:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Do we need this method? Can users just call event_loop.stop directly?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants