Skip to content

Conversation

@mehtarac
Copy link
Member

Description

Pull Request: Bidirectional Streaming Implementation

Overview

This PR introduces bidirectional streaming capabilities to Strands SDK for real-time, interactive conversations between users and AI models through persistent connections. This changes Strands from a request-response pattern to a concurrent, connection-based streaming approach.

Problem Statement

Strands currently uses a sequential request-response architecture that prevents real-time interaction:

  • Users cannot interrupt ongoing responses
  • No support for concurrent tool execution during model generation
  • Each interaction requires a complete request-response cycle
  • No native audio input/output capabilities

Solution

Bidirectional streaming introduces persistent connections with concurrent processing:

  • Real-time interruption during model generation
  • Concurrent tool execution without blocking conversation flow
  • Native audio support with format normalization across providers
  • Persistent connections lasting 8-30 minutes depending on provider

Architecture Overview

graph TB
    subgraph "Current Unidirectional Architecture"
        A1[Agent] --> B1[Model.stream]
        B1 --> C1[Sequential Events]
        C1 --> D1[Tool Execution BLOCKS]
        D1 --> E1[Response Complete]
    end
    
    subgraph "New Bidirectional Architecture"
        A2[BidirectionalAgent] --> B2[BidirectionalConnection]
        B2 --> C2[Model Events Processor]
        B2 --> D2[Tool Execution Processor]  
        B2 --> E2[Connection Coordinator]
        
        C2 --> F2[Event Queue]
        D2 --> G2[Tool Queue]
        E2 --> H2[Background Tasks Management]
        
        F2 --> I2[Agent.receive]
        G2 --> J2[Concurrent Tool Execution]
    end
Loading

Component Architecture

1. BidirectionalAgent - User Interface Layer

The BidirectionalAgent provides the user-facing interface for bidirectional streaming conversations. It follows the same patterns as Strands' existing Agent class but is built for persistent connections and real-time interaction.

Like the standard Agent, BidirectionalAgent uses compositional design, delegating to specialized components (ToolRegistry, ToolExecutor) rather than implementing functionality directly. It requires a BidirectionalModel type in its constructor, providing compile-time validation that prevents runtime configuration errors.

Key differences from the standard Agent:

  • Connection Management: Manages persistent connections instead of discrete request-response cycles
  • Real-time Interface: Provides concurrent methods (send_audio(), interrupt(), receive()) for live interaction
  • Concurrent Design: Built for real-time processing from initialization, maintaining familiar patterns (start_conversation() parallels invoke_async())

2. BidirectionalConnection - Concurrent Event Loop Engine

The BidirectionalConnection transforms Strands from sequential event processing to concurrent task coordination. This replaces the existing event_loop_cycle() pattern with persistent, concurrent processing.

Current Event Loop Architecture

The existing event loop processes one conversation turn at a time in a sequential pattern (see Event Loop Cycle documentation).

Each call to event_loop_cycle() handles one complete conversation turn then terminates. Tool execution blocks the entire conversation flow until completion.

New Concurrent Architecture

BidirectionalConnection runs continuously throughout the connection (8-30 minutes) with three concurrent processors working together:

graph TB
    A[Model Events Processor] --> D[Event Queue]
    B[Tool Execution Processor] --> E[Tool Queue]
    C[Connection Coordinator] --> F[Connection State]
    
    D --> G[Agent.receive]
    E --> H[Tool Results]
    
    I[Provider Events] --> A
    J[Tool Requests] --> B
    K[User Input] --> A
Loading

The three processors work concurrently:

  1. Model Events Processor: Receives continuous events from the provider, converts them to Strands format, and routes to appropriate handlers
  2. Tool Execution Processor: Executes tools concurrently without blocking conversation flow, with cancellation support during interruptions
  3. Connection Coordinator: Supervises background tasks, manages connection lifecycle, and coordinates interruption handling

Event Loop Design

sequenceDiagram
    participant User
    participant Agent as BidirectionalAgent
    participant Conn as BidirectionalConnection
    participant ModelSession as BidirectionalModelSession
    participant ModelEventsTask as _process_model_events
    participant ToolExecTask as _process_tool_execution
    participant CycleTask as bidirectional_event_loop_cycle
    participant Provider as Provider Stream

    User->>Agent: start_conversation()
    Agent->>+Conn: start_bidirectional_connection(agent)
    Conn->>+ModelSession: model.create_bidirectional_connection()
    ModelSession->>Provider: Initialize provider stream
    
    par Background Task Initialization
        Conn->>ModelEventsTask: asyncio.create_task(_process_model_events)
        Conn->>ToolExecTask: asyncio.create_task(_process_tool_execution)
        Conn->>CycleTask: asyncio.create_task(bidirectional_event_loop_cycle)
    end
    
    Conn-->>-Agent: return BidirectionalConnection
    
    User->>Agent: send_audio(audio_input)
    Agent->>ModelSession: send_audio_content(audio_input)
    ModelSession->>Provider: Send formatted provider event
    
    loop Concurrent Processing
        Provider-->>ModelSession: Raw provider events
        ModelSession->>ModelSession: Convert to standardized format
        ModelEventsTask->>ModelSession: receive_events()
        ModelSession-->>ModelEventsTask: Standardized events
        
        alt Tool Use Event
            ModelEventsTask->>ToolExecTask: tool_queue.put(tool_use)
            ToolExecTask->>ToolExecTask: Execute tool with Strands infrastructure
            ToolExecTask->>ModelSession: send_tool_result(result)
            ModelSession->>Provider: Send formatted tool result
        else Text/Audio Output
            ModelEventsTask->>Agent: agent._output_queue.put(event)
            Agent-->>User: receive() yields event
        else Interruption Detected
            ModelEventsTask->>Conn: _handle_interruption()
            Conn->>ToolExecTask: Cancel pending tool tasks
            Conn->>Agent: Clear audio output queue
        end
        
        CycleTask->>CycleTask: Supervise background tasks health
    end
Loading

Event Flow and Processing

The sequence diagram shows the actual implementation flow with accurate component interactions:

  1. Connection Setup: start_bidirectional_connection() creates a model session and launches three background tasks
  2. Task Management: Model events task calls receive_events(), tool execution task monitors tool queue, cycle task supervises health
  3. Input Processing: User input goes through Agent → ModelSession → Provider with proper formatting
  4. Event Streaming: Provider events flow through ModelSession normalization before reaching background tasks
  5. Tool Execution: Tools execute using existing Strands infrastructure with results sent back through ModelSession
  6. Output Flow: Events reach user through Agent's output queue consumed by receive() method
  7. Interruption: Detected by model events task, handled by connection with task cancellation and queue clearing

Key implementation detail: Events flow through the BidirectionalModelSession layer which normalizes provider-specific formats before reaching the background processing tasks.

3. Model Interface - Protocol Normalization

The new model interface creates a unified interface across different bidirectional streaming protocols. This design maintains Strands' core philosophy that users should be able to switch between model providers without changing their application code.

Separation from Existing Model Architecture

The existing Model interface handles stateless, discrete operations where each stream() call is independent. The new BidirectionalModel interfaces manage persistent connections with continuous event streams and multiple concurrent input methods (send_audio_content(), send_text_content(), send_interrupt()). This separation is necessary because bidirectional streaming providers use different protocols compared to traditional request-response models. Each provider implements their own event sequences, connection management, and data formats for real-time streaming.

4. Bidirectional Type System

The type system extends Strands' existing StreamEvent types to support bidirectional streaming while maintaining full backward compatibility.

New event types include:

  • Audio Events: audioOutput and audioInput with standardized format (raw bytes, explicit sample rates)
  • Connection Events: BidirectionalConnectionStart and BidirectionalConnectionEnd for lifecycle management
  • Interruption Events: interruptionDetected for real-time conversation control

5. Nova Sonic Model Provider Implementation

Strands follows a model-agnostic philosophy, supporting multiple AI providers through a unified interface. Users can switch between Amazon Bedrock, Anthropic, OpenAI, Ollama, and others without changing their application code. This same philosophy extends to bidirectional streaming.

Nova Sonic is Amazon's bidirectional speech-to-speech streaming model, and serves as the reference implementation for this architecture. Nova Sonic requires event sequencing with hierarchical structures (sessionStart → promptStart → contentStart → input → contentEnd). The implementation handles this complexity internally while presenting a simple send_text() and send_audio() interface to users.

Implementation Benefits

Architecture Advantages

  1. Separation of Concerns: Each component has a single responsibility
  2. Concurrent Design: Built for real-time processing
  3. Provider Agnostic: Unified interface abstracts protocol complexity
  4. Type Safe: Compile-time guarantees prevent runtime configuration errors

Maintained Compatibility

  • Existing Agent Class: Unchanged and fully functional
  • Current Model Providers: No modifications to existing model implementations
  • Tool Definitions: All existing tools work with bidirectional agents
  • Type System: BidirectionalStreamEvent inherits all existing StreamEvent fields

Experimental Status

Current State

This implementation is a working proof-of-concept that validates the architectural approach with Nova Sonic integration. The core functionality is operational and demonstrates end-to-end bidirectional streaming capabilities.

API Stability Warning

This feature is experimental and subject to breaking changes:

  • Interface methods and parameters may evolve
  • Event types and data structures will be refined
  • Provider implementations may undergo changes
  • Integration patterns will be optimized based on usage feedback

Testing and Validation

Interactive Test Script

The implementation includes a comprehensive test script at src/strands/experimental/bidirectional_streaming/tests/test_bidirectional_streaming.py that demonstrates real-time bidirectional streaming capabilities:

# Run the interactive test
python src/strands/experimental/bidirectional_streaming/tests/test_bidirectional_streaming.py

Recommended Setup: Use headphones for the best experience to prevent audio feedback between microphone and speakers.

The test script demonstrates:

  • Real-time Audio Processing: Live microphone input and speaker output with 16kHz/24kHz sample rates
  • Interruption Handling: Responsive interruption detection with immediate audio queue clearing
  • Concurrent Operations: Simultaneous audio recording, playback, event processing, and sending
  • Tool Integration: Calculator tool execution during conversation flow
  • Connection Management: Complete connection lifecycle with proper cleanup

Related Issues

#217

Documentation PR

Type of Change

New feature

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@mehtarac mehtarac requested a review from a team September 25, 2025 14:51
@cagataycali
Copy link
Member

With this experimental merge the python runtime requirements are changing:

"Python>=3.12 and all versions of aws-sdk-bedrock-runtime"

@mehtarac
Copy link
Member Author

mehtarac commented Sep 30, 2025

aws-sdk-bedrock-runtime

added the python version in the runtime dependency in the pyproject.toml file

Comment on lines 45 to 76
NOVA_INFERENCE_CONFIG = {
"maxTokens": 1024,
"topP": 0.9,
"temperature": 0.7
}

NOVA_AUDIO_INPUT_CONFIG = {
"mediaType": "audio/lpcm",
"sampleRateHertz": 16000,
"sampleSizeBits": 16,
"channelCount": 1,
"audioType": "SPEECH",
"encoding": "base64"
}

NOVA_AUDIO_OUTPUT_CONFIG = {
"mediaType": "audio/lpcm",
"sampleRateHertz": 24000,
"sampleSizeBits": 16,
"channelCount": 1,
"voiceId": "matthew",
"encoding": "base64",
"audioType": "SPEECH"
}

NOVA_TEXT_CONFIG = {"mediaType": "text/plain"}
NOVA_TOOL_CONFIG = {"mediaType": "application/json"}

# Timing constants
SILENCE_THRESHOLD = 2.0
EVENT_DELAY = 0.1
RESPONSE_TIMEOUT = 1.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like all of this should be client configurable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will follow-up on this in a separate PR as part of iterating on the model provider.

"boto3-stubs[sagemaker-runtime]>=1.26.0,<2.0.0",
"openai>=1.68.0,<2.0.0", # SageMaker uses OpenAI-compatible interface
]
bidirectional-streaming = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets set upper and lower bounds for each dependency here

DEFAULT_SAMPLE_RATE = 16000
DEFAULT_CHANNELS = 1

class AudioOutputEvent(TypedDict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use dataclasses or pydantic instead of TypedDicts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- can follow up on this in a separate PR for setting up the typing standards.

@Unshure
Copy link
Member

Unshure commented Sep 30, 2025

Can you also include an example usage of this code in the description of the pr?

@mehtarac
Copy link
Member Author

mehtarac commented Oct 3, 2025

Synced with @Unshure regarding proceeding with the PR. Summary:

  • Since the feature is under active development and being iterated on, it's safer to open the PR in a fork since the code will be upto the sdk standards as we continue to build and modify it. Once the code is upto the quality bar, we will then open the PR to add to the main branch of the sdk-python repo.

The PR in the fork is opened now: mehtarac#1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants