From 3107e6bac979c137cf575b1fbd45b57e1e3c87fd Mon Sep 17 00:00:00 2001 From: Rachit Mehta Date: Thu, 9 Oct 2025 12:53:41 -0400 Subject: [PATCH 1/3] (feat)bidirectional_streaming: add openai realtime model provider --- pyproject.toml | 12 + .../bidirectional_streaming/__init__.py | 46 +- .../models/__init__.py | 10 +- .../models/novasonic.py | 12 +- .../bidirectional_streaming/models/openai.py | 508 ++++++++++++++++++ ...al_streaming.py => test_bidi_novasonic.py} | 0 .../tests/test_bidi_openai.py | 285 ++++++++++ .../bidirectional_streaming/types/__init__.py | 4 + .../types/bidirectional_streaming.py | 50 +- 9 files changed, 916 insertions(+), 11 deletions(-) create mode 100644 src/strands/experimental/bidirectional_streaming/models/openai.py rename src/strands/experimental/bidirectional_streaming/tests/{test_bidirectional_streaming.py => test_bidi_novasonic.py} (100%) create mode 100644 src/strands/experimental/bidirectional_streaming/tests/test_bidi_openai.py diff --git a/pyproject.toml b/pyproject.toml index 3b8866f4a..2900719ef 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,12 +53,24 @@ sagemaker = [ "boto3-stubs[sagemaker-runtime]>=1.26.0,<2.0.0", "openai>=1.68.0,<2.0.0", # SageMaker uses OpenAI-compatible interface ] +bidirectional-streaming-nova = [ + "pyaudio>=0.2.13", + "rx>=3.2.0", + "smithy-aws-core>=0.0.1", + "pytz", + "aws_sdk_bedrock_runtime", +] +bidirectional-streaming-openai = [ + "pyaudio>=0.2.13", + "websockets>=12.0,<14.0", +] bidirectional-streaming = [ "pyaudio>=0.2.13", "rx>=3.2.0", "smithy-aws-core>=0.0.1", "pytz", "aws_sdk_bedrock_runtime", + "websockets>=12.0,<14.0", ] otel = ["opentelemetry-exporter-otlp-proto-http>=1.30.0,<2.0.0"] docs = [ diff --git a/src/strands/experimental/bidirectional_streaming/__init__.py b/src/strands/experimental/bidirectional_streaming/__init__.py index 52822711a..a6af41dff 100644 --- a/src/strands/experimental/bidirectional_streaming/__init__.py +++ b/src/strands/experimental/bidirectional_streaming/__init__.py @@ -1,2 +1,46 @@ -"""Bidirectional streaming package for real-time audio/text conversations.""" +""" +Bidirectional streaming package. +""" +# Main components - Primary user interface +from .agent.agent import BidirectionalAgent + +# Model providers - What users need to create models +from .models.novasonic import NovaSonicBidirectionalModel +from .models.openai import OpenAIRealtimeBidirectionalModel + +# Event types - For type hints and event handling +from .types.bidirectional_streaming import ( + AudioInputEvent, + AudioOutputEvent, + TextOutputEvent, + InterruptionDetectedEvent, + BidirectionalStreamEvent, + VoiceActivityEvent, + UsageMetricsEvent, +) + +# Advanced interfaces (for custom implementations) +from .models.bidirectional_model import BidirectionalModel, BidirectionalModelSession + +__all__ = [ + # Main interface + "BidirectionalAgent", + + # Model providers + "NovaSonicBidirectionalModel", + "OpenAIRealtimeBidirectionalModel", + + # Event types + "AudioInputEvent", + "AudioOutputEvent", + "TextOutputEvent", + "InterruptionDetectedEvent", + "BidirectionalStreamEvent", + "VoiceActivityEvent", + "UsageMetricsEvent", + + # Model interface + "BidirectionalModel", + "BidirectionalModelSession", +] \ No newline at end of file diff --git a/src/strands/experimental/bidirectional_streaming/models/__init__.py b/src/strands/experimental/bidirectional_streaming/models/__init__.py index 6cba974e0..4a11f9e4a 100644 --- a/src/strands/experimental/bidirectional_streaming/models/__init__.py +++ b/src/strands/experimental/bidirectional_streaming/models/__init__.py @@ -2,5 +2,13 @@ from .bidirectional_model import BidirectionalModel, BidirectionalModelSession from .novasonic import NovaSonicBidirectionalModel, NovaSonicSession +from .openai import OpenAIRealtimeBidirectionalModel, OpenAIRealtimeSession -__all__ = ["BidirectionalModel", "BidirectionalModelSession", "NovaSonicBidirectionalModel", "NovaSonicSession"] +__all__ = [ + "BidirectionalModel", + "BidirectionalModelSession", + "NovaSonicBidirectionalModel", + "NovaSonicSession", + "OpenAIRealtimeBidirectionalModel", + "OpenAIRealtimeSession" +] \ No newline at end of file diff --git a/src/strands/experimental/bidirectional_streaming/models/novasonic.py b/src/strands/experimental/bidirectional_streaming/models/novasonic.py index 7f7937ef1..bc00b7e91 100644 --- a/src/strands/experimental/bidirectional_streaming/models/novasonic.py +++ b/src/strands/experimental/bidirectional_streaming/models/novasonic.py @@ -35,6 +35,7 @@ BidirectionalConnectionStartEvent, InterruptionDetectedEvent, TextOutputEvent, + UsageMetricsEvent, ) from .bidirectional_model import BidirectionalModel, BidirectionalModelSession @@ -488,9 +489,16 @@ def _convert_nova_event(self, nova_event: dict[str, any]) -> dict[str, any] | No return {"interruptionDetected": interruption} - # Handle usage events (ignore) + # Handle usage events - convert to standardized format elif "usageEvent" in nova_event: - return None + usage_data = nova_event["usageEvent"] + usage_metrics: UsageMetricsEvent = { + "totalTokens": usage_data.get("totalTokens"), + "inputTokens": usage_data.get("totalInputTokens"), + "outputTokens": usage_data.get("totalOutputTokens"), + "audioTokens": usage_data.get("details", {}).get("total", {}).get("output", {}).get("speechTokens") + } + return {"usageMetrics": usage_metrics} # Handle content start events (track role) elif "contentStart" in nova_event: diff --git a/src/strands/experimental/bidirectional_streaming/models/openai.py b/src/strands/experimental/bidirectional_streaming/models/openai.py new file mode 100644 index 000000000..0fa859db9 --- /dev/null +++ b/src/strands/experimental/bidirectional_streaming/models/openai.py @@ -0,0 +1,508 @@ +"""OpenAI Realtime API provider for Strands bidirectional streaming. + +Provides real-time audio and text communication through OpenAI's Realtime API +with WebSocket connections, voice activity detection, and function calling. +""" + +import asyncio +import base64 +import json +import logging +import uuid +from typing import AsyncIterable + +import websockets +from websockets.exceptions import ConnectionClosed +from websockets.client import WebSocketClientProtocol + +from ....types.content import Messages +from ....types.tools import ToolSpec, ToolUse +from ..types.bidirectional_streaming import ( + AudioInputEvent, + AudioOutputEvent, + BidirectionalConnectionEndEvent, + BidirectionalConnectionStartEvent, + BidirectionalStreamEvent, + InterruptionDetectedEvent, + TextOutputEvent, + VoiceActivityEvent, +) +from .bidirectional_model import BidirectionalModel, BidirectionalModelSession + +logger = logging.getLogger(__name__) + +# OpenAI Realtime API configuration +OPENAI_REALTIME_URL = "wss://api.openai.com/v1/realtime" +DEFAULT_MODEL = "gpt-realtime" + +AUDIO_FORMAT = {"type": "audio/pcm", "rate": 24000} + +DEFAULT_SESSION_CONFIG = { + "type": "realtime", + "instructions": "You are a helpful assistant. Please speak in English and keep your responses clear and concise.", + "output_modalities": ["audio"], + "audio": { + "input": { + "format": AUDIO_FORMAT, + "turn_detection": { + "type": "server_vad", + "threshold": 0.5, + "prefix_padding_ms": 300, + "silence_duration_ms": 500, + } + }, + "output": {"format": AUDIO_FORMAT, "voice": "alloy"}, + }, +} + + +class OpenAIRealtimeSession(BidirectionalModelSession): + """OpenAI Realtime API session for real-time audio/text streaming. + + Manages WebSocket connection to OpenAI's Realtime API with automatic VAD, + function calling, and event conversion to Strands format. + """ + + def __init__(self, websocket: WebSocketClientProtocol, config: dict[str, any]) -> None: + """Initialize OpenAI Realtime session.""" + self.websocket = websocket + self.config = config + self.session_id = str(uuid.uuid4()) + self._active = True + + self._event_queue = asyncio.Queue() + self._response_task = None + self._function_call_buffer = {} + + logger.debug("OpenAI Realtime session initialized: %s", self.session_id) + + def _require_active(self) -> bool: + """Check if session is active.""" + return self._active + + def _create_text_event(self, text: str, role: str) -> dict[str, any]: + """Create standardized text output event.""" + text_output: TextOutputEvent = {"text": text, "role": role} + return {"textOutput": text_output} + + def _create_voice_activity_event(self, activity_type: str) -> dict[str, any]: + """Create standardized voice activity event.""" + voice_activity: VoiceActivityEvent = {"activityType": activity_type} + return {"voiceActivity": voice_activity} + + async def _create_conversation_item(self, item_data: dict) -> None: + """Create conversation item and trigger response.""" + await self._send_event({"type": "conversation.item.create", "item": item_data}) + await self._send_event({"type": "response.create"}) + + async def initialize( + self, + system_prompt: str | None = None, + tools: list[ToolSpec] | None = None, + messages: Messages | None = None, + ) -> None: + """Initialize session with configuration.""" + try: + session_config = self._build_session_config(system_prompt, tools) + await self._send_event({"type": "session.update", "session": session_config}) + + if messages: + await self._add_conversation_history(messages) + + self._response_task = asyncio.create_task(self._process_responses()) + logger.info("OpenAI Realtime session initialized successfully") + + except Exception as e: + logger.error("Error during OpenAI Realtime initialization: %s", e) + raise + + def _build_session_config(self, system_prompt: str | None, tools: list[ToolSpec] | None) -> dict: + """Build session configuration for OpenAI Realtime API.""" + config = DEFAULT_SESSION_CONFIG.copy() + + if system_prompt: + config["instructions"] = system_prompt + + if tools: + config["tools"] = self._convert_tools_to_openai_format(tools) + + custom_config = self.config.get("session", {}) + supported_params = { + "type", "output_modalities", "instructions", "voice", "audio", + "tools", "tool_choice", "input_audio_format", "output_audio_format", + "input_audio_transcription", "turn_detection" + } + + for key, value in custom_config.items(): + if key in supported_params: + config[key] = value + else: + logger.warning("Ignoring unsupported session parameter: %s", key) + + return config + + def _convert_tools_to_openai_format(self, tools: list[ToolSpec]) -> list[dict]: + """Convert Strands tool specifications to OpenAI function format.""" + openai_tools = [] + + for tool in tools: + input_schema = tool["inputSchema"] + if "json" in input_schema: + schema = json.loads(input_schema["json"]) if isinstance(input_schema["json"], str) else input_schema["json"] + else: + schema = input_schema + + openai_tool = { + "type": "function", + "function": { + "name": tool["name"], + "description": tool["description"], + "parameters": schema + } + } + openai_tools.append(openai_tool) + + return openai_tools + + async def _add_conversation_history(self, messages: Messages) -> None: + """Add conversation history to the session.""" + for message in messages: + conversation_item = { + "type": "conversation.item.create", + "item": {"type": "message", "role": message["role"], "content": []} + } + + content = message.get("content", "") + if isinstance(content, str): + conversation_item["item"]["content"].append({"type": "input_text", "text": content}) + elif isinstance(content, list): + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + conversation_item["item"]["content"].append({"type": "input_text", "text": item.get("text", "")}) + + await self._send_event(conversation_item) + + async def _process_responses(self) -> None: + """Process incoming WebSocket messages.""" + logger.debug("OpenAI Realtime response processor started") + + try: + async for message in self.websocket: + if not self._active: + break + + try: + event = json.loads(message) + await self._event_queue.put(event) + except json.JSONDecodeError as e: + logger.warning("Failed to parse OpenAI event: %s", e) + continue + + except ConnectionClosed: + logger.debug("OpenAI Realtime WebSocket connection closed") + except Exception as e: + logger.error("Error in OpenAI Realtime response processing: %s", e) + finally: + self._active = False + logger.debug("OpenAI Realtime response processor stopped") + + async def receive_events(self) -> AsyncIterable[BidirectionalStreamEvent]: + """Receive OpenAI events and convert to Strands format.""" + connection_start: BidirectionalConnectionStartEvent = { + "connectionId": self.session_id, + "metadata": {"provider": "openai_realtime", "model": self.config.get("model", DEFAULT_MODEL)}, + } + yield {"BidirectionalConnectionStart": connection_start} + + try: + while self._active: + try: + openai_event = await asyncio.wait_for(self._event_queue.get(), timeout=1.0) + provider_event = self._convert_openai_event(openai_event) + if provider_event: + yield provider_event + except asyncio.TimeoutError: + continue + + except Exception as e: + logger.error("Error receiving OpenAI Realtime event: %s", e) + finally: + connection_end: BidirectionalConnectionEndEvent = { + "connectionId": self.session_id, + "reason": "connection_complete", + "metadata": {"provider": "openai_realtime"}, + } + yield {"BidirectionalConnectionEnd": connection_end} + + def _convert_openai_event(self, openai_event: dict[str, any]) -> dict[str, any] | None: + """Convert OpenAI events to Strands format.""" + event_type = openai_event.get("type") + + # Audio output + if event_type == "response.output_audio.delta": + audio_data = base64.b64decode(openai_event["delta"]) + audio_output: AudioOutputEvent = { + "audioData": audio_data, + "format": "pcm", + "sampleRate": 24000, + "channels": 1, + "encoding": None, + } + return {"audioOutput": audio_output} + + # Text output using helper method + elif event_type == "response.output_text.delta": + return self._create_text_event(openai_event["delta"], "assistant") + + elif event_type == "response.output_audio_transcript.delta": + return self._create_text_event(openai_event["delta"], "assistant") + + # User transcription + elif event_type == "conversation.item.input_audio_transcription.delta": + transcript_delta = openai_event.get("delta", "") + return self._create_text_event(transcript_delta, "user") if transcript_delta.strip() else None + + elif event_type == "conversation.item.input_audio_transcription.completed": + transcript = openai_event.get("transcript", "") + return self._create_text_event(transcript, "user") if transcript.strip() else None + + elif event_type == "conversation.item.input_audio_transcription.segment": + segment_data = openai_event.get("segment", {}) + text = segment_data.get("text", "") + return self._create_text_event(text, "user") if text.strip() else None + + elif event_type == "conversation.item.input_audio_transcription.failed": + error_info = openai_event.get("error", {}) + logger.warning("OpenAI transcription failed: %s", error_info.get("message", "Unknown error")) + return None + + # Function call processing + elif event_type == "response.function_call_arguments.delta": + call_id = openai_event.get("call_id") + delta = openai_event.get("delta", "") + if call_id: + if call_id not in self._function_call_buffer: + self._function_call_buffer[call_id] = {"call_id": call_id, "name": "", "arguments": delta} + else: + self._function_call_buffer[call_id]["arguments"] += delta + return None + + elif event_type == "response.function_call_arguments.done": + call_id = openai_event.get("call_id") + if call_id and call_id in self._function_call_buffer: + function_call = self._function_call_buffer[call_id] + try: + tool_use: ToolUse = { + "toolUseId": call_id, + "name": function_call["name"], + "input": json.loads(function_call["arguments"]) if function_call["arguments"] else {}, + } + del self._function_call_buffer[call_id] + return {"toolUse": tool_use} + except (json.JSONDecodeError, KeyError) as e: + logger.warning("Error parsing function arguments for %s: %s", call_id, e) + del self._function_call_buffer[call_id] + return None + + # Voice activity detection using helper method + elif event_type == "input_audio_buffer.speech_started": + return self._create_voice_activity_event("speech_started") + elif event_type == "input_audio_buffer.speech_stopped": + return self._create_voice_activity_event("speech_stopped") + elif event_type == "input_audio_buffer.timeout_triggered": + return self._create_voice_activity_event("timeout") + + # Lifecycle events (log only) + elif event_type == "conversation.item.retrieve": + item = openai_event.get("item", {}) + logger.debug("OpenAI conversation item retrieved: %s", item.get("id")) + return None + + elif event_type == "conversation.item.added": + logger.debug("OpenAI conversation item added: %s", openai_event.get("item", {}).get("id")) + return None + + elif event_type == "conversation.item.done": + logger.debug("OpenAI conversation item done: %s", openai_event.get("item", {}).get("id")) + + item = openai_event.get("item", {}) + if item.get("type") == "message" and item.get("role") == "assistant": + content_parts = item.get("content", []) + if content_parts: + message_content = [] + for content_part in content_parts: + if content_part.get("type") == "output_text": + message_content.append({"type": "text", "text": content_part.get("text", "")}) + elif content_part.get("type") == "output_audio": + transcript = content_part.get("transcript", "") + if transcript: + message_content.append({"type": "text", "text": transcript}) + + if message_content: + message = {"role": "assistant", "content": message_content} + return {"messageStop": {"message": message}} + return None + + elif event_type in ["response.output_item.added", "response.output_item.done", + "response.content_part.added", "response.content_part.done"]: + item_data = openai_event.get("item") or openai_event.get("part") + logger.debug("OpenAI %s: %s", event_type, item_data.get("id") if item_data else "unknown") + + # Track function call names from response.output_item.added + if event_type == "response.output_item.added": + item = openai_event.get("item", {}) + if item.get("type") == "function_call": + call_id = item.get("call_id") + function_name = item.get("name") + if call_id and function_name: + if call_id not in self._function_call_buffer: + self._function_call_buffer[call_id] = {"call_id": call_id, "name": function_name, "arguments": ""} + else: + self._function_call_buffer[call_id]["name"] = function_name + return None + + elif event_type in ["input_audio_buffer.committed", "input_audio_buffer.cleared", + "session.created", "session.updated"]: + logger.debug("OpenAI %s event", event_type) + return None + + elif event_type == "error": + logger.error("OpenAI Realtime error: %s", openai_event.get("error", {})) + return None + + else: + logger.debug("Unhandled OpenAI event type: %s", event_type) + return None + + async def send_audio_content(self, audio_input: AudioInputEvent) -> None: + """Send audio content to OpenAI for processing.""" + if not self._require_active(): + return + + audio_base64 = base64.b64encode(audio_input["audioData"]).decode("utf-8") + await self._send_event({"type": "input_audio_buffer.append", "audio": audio_base64}) + + async def send_text_content(self, text: str, **kwargs) -> None: + """Send text content to OpenAI for processing.""" + if not self._require_active(): + return + + item_data = { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": text}] + } + await self._create_conversation_item(item_data) + + async def send_interrupt(self) -> None: + """Send interruption signal to OpenAI.""" + if not self._require_active(): + return + + await self._send_event({"type": "response.cancel"}) + + async def send_tool_result(self, tool_use_id: str, result: dict[str, any]) -> None: + """Send tool result back to OpenAI.""" + if not self._require_active(): + return + + logger.debug("OpenAI tool result send: %s", tool_use_id) + result_text = json.dumps(result) if not isinstance(result, str) else result + + item_data = { + "type": "function_call_output", + "call_id": tool_use_id, + "output": result_text + } + await self._create_conversation_item(item_data) + + async def close(self) -> None: + """Close session and cleanup resources.""" + if not self._active: + return + + logger.debug("OpenAI Realtime cleanup - starting connection close") + self._active = False + + if self._response_task and not self._response_task.done(): + self._response_task.cancel() + try: + await self._response_task + except asyncio.CancelledError: + pass + + try: + await self.websocket.close() + except Exception as e: + logger.warning("Error closing OpenAI Realtime WebSocket: %s", e) + + logger.debug("OpenAI Realtime connection closed") + + async def _send_event(self, event: dict[str, any]) -> None: + """Send event to OpenAI via WebSocket.""" + try: + message = json.dumps(event) + await self.websocket.send(message) + logger.debug("Sent OpenAI event: %s", event.get("type")) + except Exception as e: + logger.error("Error sending OpenAI event: %s", e) + raise + + +class OpenAIRealtimeBidirectionalModel(BidirectionalModel): + """OpenAI Realtime API provider for Strands bidirectional streaming. + + Provides real-time audio/text communication through OpenAI's Realtime API + with WebSocket connections, voice activity detection, and function calling. + """ + + def __init__( + self, + model: str = DEFAULT_MODEL, + api_key: str | None = None, + **config: any + ) -> None: + """Initialize OpenAI Realtime bidirectional model.""" + self.model = model + self.api_key = api_key + self.config = config + + import os + if not self.api_key: + self.api_key = os.getenv("OPENAI_API_KEY") + if not self.api_key: + raise ValueError("OpenAI API key is required. Set OPENAI_API_KEY environment variable or pass api_key parameter.") + + logger.debug("OpenAI Realtime bidirectional model initialized: %s", model) + + async def create_bidirectional_connection( + self, + system_prompt: str | None = None, + tools: list[ToolSpec] | None = None, + messages: Messages | None = None, + **kwargs, + ) -> BidirectionalModelSession: + """Create bidirectional connection to OpenAI Realtime API.""" + logger.info("Creating OpenAI Realtime connection...") + + try: + url = f"{OPENAI_REALTIME_URL}?model={self.model}" + + headers = [("Authorization", f"Bearer {self.api_key}")] + if "organization" in self.config: + headers.append(("OpenAI-Organization", self.config["organization"])) + if "project" in self.config: + headers.append(("OpenAI-Project", self.config["project"])) + + websocket = await websockets.connect(url, additional_headers=headers) + logger.info("WebSocket connected successfully") + + session = OpenAIRealtimeSession(websocket, self.config) + await session.initialize(system_prompt, tools, messages) + + logger.info("OpenAI Realtime connection established") + return session + + except Exception as e: + logger.error("OpenAI connection error: %s", e) + raise \ No newline at end of file diff --git a/src/strands/experimental/bidirectional_streaming/tests/test_bidirectional_streaming.py b/src/strands/experimental/bidirectional_streaming/tests/test_bidi_novasonic.py similarity index 100% rename from src/strands/experimental/bidirectional_streaming/tests/test_bidirectional_streaming.py rename to src/strands/experimental/bidirectional_streaming/tests/test_bidi_novasonic.py diff --git a/src/strands/experimental/bidirectional_streaming/tests/test_bidi_openai.py b/src/strands/experimental/bidirectional_streaming/tests/test_bidi_openai.py new file mode 100644 index 000000000..098ec4a39 --- /dev/null +++ b/src/strands/experimental/bidirectional_streaming/tests/test_bidi_openai.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3 +"""Test OpenAI Realtime API speech-to-speech interaction.""" + +import asyncio +import os +import sys +import time +from pathlib import Path + +# Add the src directory to Python path +sys.path.insert(0, str(Path(__file__).parent / "src")) + +import pyaudio + +from strands.experimental.bidirectional_streaming.agent.agent import BidirectionalAgent +from strands.experimental.bidirectional_streaming.models.openai import OpenAIRealtimeBidirectionalModel + + +async def play(context): + """Handle audio playback with interruption support.""" + audio = pyaudio.PyAudio() + + try: + speaker = audio.open( + format=pyaudio.paInt16, + channels=1, + rate=24000, # OpenAI Realtime uses 24kHz + output=True, + frames_per_buffer=1024, + ) + + while context["active"]: + try: + # Check for interruption + if context.get("interrupted", False): + # Clear audio queue on interruption + while not context["audio_out"].empty(): + try: + context["audio_out"].get_nowait() + except asyncio.QueueEmpty: + break + + context["interrupted"] = False + await asyncio.sleep(0.05) + continue + + # Get audio data with timeout + try: + audio_data = await asyncio.wait_for(context["audio_out"].get(), timeout=0.1) + + if audio_data and context["active"]: + # Play in chunks to allow interruption + chunk_size = 1024 + for i in range(0, len(audio_data), chunk_size): + if context.get("interrupted", False) or not context["active"]: + break + + chunk = audio_data[i:i + chunk_size] + speaker.write(chunk) + await asyncio.sleep(0.001) # Brief pause for responsiveness + + except asyncio.TimeoutError: + continue + + except asyncio.CancelledError: + break + + except asyncio.CancelledError: + pass + except Exception as e: + print(f"Audio playback error: {e}") + finally: + try: + speaker.close() + except: + pass + audio.terminate() + + +async def record(context): + """Handle microphone recording.""" + audio = pyaudio.PyAudio() + + try: + microphone = audio.open( + format=pyaudio.paInt16, + channels=1, + rate=24000, # Match OpenAI's expected input rate + input=True, + frames_per_buffer=1024, + ) + + while context["active"]: + try: + audio_bytes = microphone.read(1024, exception_on_overflow=False) + await context["audio_in"].put(audio_bytes) + await asyncio.sleep(0.01) + except asyncio.CancelledError: + break + + except asyncio.CancelledError: + pass + except Exception as e: + print(f"Microphone recording error: {e}") + finally: + try: + microphone.close() + except: + pass + audio.terminate() + + +async def receive(agent, context): + """Handle events from the agent.""" + try: + async for event in agent.receive(): + if not context["active"]: + break + + # Handle audio output + if "audioOutput" in event: + audio_data = event["audioOutput"]["audioData"] + + if not context.get("interrupted", False): + await context["audio_out"].put(audio_data) + + # Handle text output (transcripts) + elif "textOutput" in event: + text_output = event["textOutput"] + role = text_output.get("role", "assistant") + text = text_output.get("text", "").strip() + + if text: + if role == "user": + print(f"User: {text}") + elif role == "assistant": + print(f"Assistant: {text}") + + # Handle interruption detection + elif "interruptionDetected" in event: + context["interrupted"] = True + + # Handle connection events + elif "BidirectionalConnectionStart" in event: + pass # Silent connection start + elif "BidirectionalConnectionEnd" in event: + context["active"] = False + break + + except asyncio.CancelledError: + pass + except Exception as e: + print(f"Receive handler error: {e}") + finally: + pass + + +async def send(agent, context): + """Send audio from microphone to agent.""" + try: + while context["active"]: + try: + audio_bytes = await asyncio.wait_for(context["audio_in"].get(), timeout=0.1) + + # Create audio event in expected format + audio_event = { + "audioData": audio_bytes, + "format": "pcm", + "sampleRate": 24000, + "channels": 1 + } + + await agent.send(audio_event) + + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + break + + except asyncio.CancelledError: + pass + except Exception as e: + print(f"Send handler error: {e}") + finally: + pass + + +async def main(): + """Main test function for OpenAI voice chat.""" + print("Starting OpenAI Realtime API test...") + + # Check API key + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + print("OPENAI_API_KEY environment variable not set") + return False + + # Check audio system + try: + audio = pyaudio.PyAudio() + audio.terminate() + except Exception as e: + print(f"Audio system error: {e}") + return False + + # Create OpenAI model + model = OpenAIRealtimeBidirectionalModel( + model="gpt-4o-realtime-preview", + api_key=api_key, + session={ + "output_modalities": ["audio"], + "audio": { + "input": { + "format": {"type": "audio/pcm", "rate": 24000}, + "turn_detection": { + "type": "server_vad", + "threshold": 0.5, + "silence_duration_ms": 700 + } + }, + "output": { + "format": {"type": "audio/pcm", "rate": 24000}, + "voice": "alloy" + } + } + } + ) + + # Create agent + agent = BidirectionalAgent( + model=model, + system_prompt="You are a helpful voice assistant. Keep your responses brief and natural. Say hello when you first connect." + ) + + # Start the session + await agent.start() + + # Create shared context + context = { + "active": True, + "audio_in": asyncio.Queue(), + "audio_out": asyncio.Queue(), + "interrupted": False, + "start_time": time.time() + } + + print("Speak into your microphone. Press Ctrl+C to stop.") + + try: + # Run all tasks concurrently + await asyncio.gather( + play(context), + record(context), + receive(agent, context), + send(agent, context), + return_exceptions=True + ) + + except KeyboardInterrupt: + print("\nInterrupted by user") + except asyncio.CancelledError: + print("\nTest cancelled") + except Exception as e: + print(f"\nError during voice chat: {e}") + finally: + print("Cleaning up...") + context["active"] = False + + try: + await agent.end() + except Exception as e: + print(f"Cleanup error: {e}") + + return True + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\nTest interrupted by user") + except Exception as e: + print(f"Test error: {e}") + import traceback + traceback.print_exc() \ No newline at end of file diff --git a/src/strands/experimental/bidirectional_streaming/types/__init__.py b/src/strands/experimental/bidirectional_streaming/types/__init__.py index 510285f06..412061146 100644 --- a/src/strands/experimental/bidirectional_streaming/types/__init__.py +++ b/src/strands/experimental/bidirectional_streaming/types/__init__.py @@ -13,6 +13,8 @@ BidirectionalStreamEvent, InterruptionDetectedEvent, TextOutputEvent, + UsageMetricsEvent, + VoiceActivityEvent, ) __all__ = [ @@ -23,6 +25,8 @@ "BidirectionalStreamEvent", "InterruptionDetectedEvent", "TextOutputEvent", + "UsageMetricsEvent", + "VoiceActivityEvent", "SUPPORTED_AUDIO_FORMATS", "SUPPORTED_SAMPLE_RATES", "SUPPORTED_CHANNELS", diff --git a/src/strands/experimental/bidirectional_streaming/types/bidirectional_streaming.py b/src/strands/experimental/bidirectional_streaming/types/bidirectional_streaming.py index 01d72356a..194698f29 100644 --- a/src/strands/experimental/bidirectional_streaming/types/bidirectional_streaming.py +++ b/src/strands/experimental/bidirectional_streaming/types/bidirectional_streaming.py @@ -116,11 +116,43 @@ class BidirectionalConnectionEndEvent(TypedDict): metadata: Provider-specific connection metadata. """ - reason: Literal["user_request", "timeout", "error"] + reason: Literal["user_request", "timeout", "error", "connection_complete"] connectionId: Optional[str] metadata: Optional[Dict[str, Any]] +class VoiceActivityEvent(TypedDict): + """Voice activity detection event for speech monitoring. + + Provides standardized voice activity detection events across providers + to enable speech-aware applications and better conversation flow. + + Attributes: + activityType: Type of voice activity detected. + """ + + activityType: Literal["speech_started", "speech_stopped", "timeout"] + + +class UsageMetricsEvent(TypedDict): + """Token usage and performance tracking. + + Provides standardized usage metrics across providers for cost monitoring + and performance optimization. + + Attributes: + totalTokens: Total tokens used in the interaction. + inputTokens: Tokens used for input processing. + outputTokens: Tokens used for output generation. + audioTokens: Tokens used specifically for audio processing. + """ + + totalTokens: Optional[int] + inputTokens: Optional[int] + outputTokens: Optional[int] + audioTokens: Optional[int] + + class BidirectionalStreamEvent(StreamEvent, total=False): """Bidirectional stream event extending existing StreamEvent. @@ -134,11 +166,15 @@ class BidirectionalStreamEvent(StreamEvent, total=False): interruptionDetected: User interruption detection. BidirectionalConnectionStart: connection start event. BidirectionalConnectionEnd: connection end event. + voiceActivity: Voice activity detection events. + usageMetrics: Token usage and performance metrics. """ - audioOutput: AudioOutputEvent - audioInput: AudioInputEvent - textOutput: TextOutputEvent - interruptionDetected: InterruptionDetectedEvent - BidirectionalConnectionStart: BidirectionalConnectionStartEvent - BidirectionalConnectionEnd: BidirectionalConnectionEndEvent + audioOutput: Optional[AudioOutputEvent] + audioInput: Optional[AudioInputEvent] + textOutput: Optional[TextOutputEvent] + interruptionDetected: Optional[InterruptionDetectedEvent] + BidirectionalConnectionStart: Optional[BidirectionalConnectionStartEvent] + BidirectionalConnectionEnd: Optional[BidirectionalConnectionEndEvent] + voiceActivity: Optional[VoiceActivityEvent] + usageMetrics: Optional[UsageMetricsEvent] From da8b86ca77e4f324b6cfc2a1d7ce756ec8a6d310 Mon Sep 17 00:00:00 2001 From: Rachit Mehta Date: Fri, 10 Oct 2025 10:29:08 -0400 Subject: [PATCH 2/3] fix function calling --- .../bidirectional_streaming/__init__.py | 15 +++++++-------- .../bidirectional_streaming/agent/agent.py | 1 - .../event_loop/bidirectional_event_loop.py | 1 - .../bidirectional_streaming/models/novasonic.py | 1 - .../bidirectional_streaming/models/openai.py | 14 ++++++-------- .../tests/test_bidi_openai.py | 2 ++ 6 files changed, 15 insertions(+), 19 deletions(-) diff --git a/src/strands/experimental/bidirectional_streaming/__init__.py b/src/strands/experimental/bidirectional_streaming/__init__.py index a6af41dff..aeb335dea 100644 --- a/src/strands/experimental/bidirectional_streaming/__init__.py +++ b/src/strands/experimental/bidirectional_streaming/__init__.py @@ -1,10 +1,12 @@ -""" -Bidirectional streaming package. +"""Bidirectional streaming package. """ # Main components - Primary user interface from .agent.agent import BidirectionalAgent +# Advanced interfaces (for custom implementations) +from .models.bidirectional_model import BidirectionalModel, BidirectionalModelSession + # Model providers - What users need to create models from .models.novasonic import NovaSonicBidirectionalModel from .models.openai import OpenAIRealtimeBidirectionalModel @@ -13,16 +15,13 @@ from .types.bidirectional_streaming import ( AudioInputEvent, AudioOutputEvent, - TextOutputEvent, - InterruptionDetectedEvent, BidirectionalStreamEvent, - VoiceActivityEvent, + InterruptionDetectedEvent, + TextOutputEvent, UsageMetricsEvent, + VoiceActivityEvent, ) -# Advanced interfaces (for custom implementations) -from .models.bidirectional_model import BidirectionalModel, BidirectionalModelSession - __all__ = [ # Main interface "BidirectionalAgent", diff --git a/src/strands/experimental/bidirectional_streaming/agent/agent.py b/src/strands/experimental/bidirectional_streaming/agent/agent.py index 68d371a51..0cd90063d 100644 --- a/src/strands/experimental/bidirectional_streaming/agent/agent.py +++ b/src/strands/experimental/bidirectional_streaming/agent/agent.py @@ -23,7 +23,6 @@ from ..models.bidirectional_model import BidirectionalModel from ..types.bidirectional_streaming import AudioInputEvent, BidirectionalStreamEvent - logger = logging.getLogger(__name__) diff --git a/src/strands/experimental/bidirectional_streaming/event_loop/bidirectional_event_loop.py b/src/strands/experimental/bidirectional_streaming/event_loop/bidirectional_event_loop.py index 16be08aaf..340cd9267 100644 --- a/src/strands/experimental/bidirectional_streaming/event_loop/bidirectional_event_loop.py +++ b/src/strands/experimental/bidirectional_streaming/event_loop/bidirectional_event_loop.py @@ -22,7 +22,6 @@ from ....types.tools import ToolResult, ToolUse from ..models.bidirectional_model import BidirectionalModelSession - logger = logging.getLogger(__name__) # Session constants diff --git a/src/strands/experimental/bidirectional_streaming/models/novasonic.py b/src/strands/experimental/bidirectional_streaming/models/novasonic.py index bc00b7e91..4e4952fa9 100644 --- a/src/strands/experimental/bidirectional_streaming/models/novasonic.py +++ b/src/strands/experimental/bidirectional_streaming/models/novasonic.py @@ -37,7 +37,6 @@ TextOutputEvent, UsageMetricsEvent, ) - from .bidirectional_model import BidirectionalModel, BidirectionalModelSession logger = logging.getLogger(__name__) diff --git a/src/strands/experimental/bidirectional_streaming/models/openai.py b/src/strands/experimental/bidirectional_streaming/models/openai.py index 0fa859db9..76bf9f50d 100644 --- a/src/strands/experimental/bidirectional_streaming/models/openai.py +++ b/src/strands/experimental/bidirectional_streaming/models/openai.py @@ -12,8 +12,8 @@ from typing import AsyncIterable import websockets -from websockets.exceptions import ConnectionClosed from websockets.client import WebSocketClientProtocol +from websockets.exceptions import ConnectionClosed from ....types.content import Messages from ....types.tools import ToolSpec, ToolUse @@ -23,7 +23,6 @@ BidirectionalConnectionEndEvent, BidirectionalConnectionStartEvent, BidirectionalStreamEvent, - InterruptionDetectedEvent, TextOutputEvent, VoiceActivityEvent, ) @@ -142,7 +141,7 @@ def _build_session_config(self, system_prompt: str | None, tools: list[ToolSpec] return config def _convert_tools_to_openai_format(self, tools: list[ToolSpec]) -> list[dict]: - """Convert Strands tool specifications to OpenAI function format.""" + """Convert Strands tool specifications to OpenAI Realtime API format.""" openai_tools = [] for tool in tools: @@ -152,13 +151,12 @@ def _convert_tools_to_openai_format(self, tools: list[ToolSpec]) -> list[dict]: else: schema = input_schema + # OpenAI Realtime API expects flat structure, not nested under "function" openai_tool = { "type": "function", - "function": { - "name": tool["name"], - "description": tool["description"], - "parameters": schema - } + "name": tool["name"], + "description": tool["description"], + "parameters": schema } openai_tools.append(openai_tool) diff --git a/src/strands/experimental/bidirectional_streaming/tests/test_bidi_openai.py b/src/strands/experimental/bidirectional_streaming/tests/test_bidi_openai.py index 098ec4a39..660040f3e 100644 --- a/src/strands/experimental/bidirectional_streaming/tests/test_bidi_openai.py +++ b/src/strands/experimental/bidirectional_streaming/tests/test_bidi_openai.py @@ -11,6 +11,7 @@ sys.path.insert(0, str(Path(__file__).parent / "src")) import pyaudio +from strands_tools import calculator from strands.experimental.bidirectional_streaming.agent.agent import BidirectionalAgent from strands.experimental.bidirectional_streaming.models.openai import OpenAIRealtimeBidirectionalModel @@ -229,6 +230,7 @@ async def main(): # Create agent agent = BidirectionalAgent( model=model, + tools=[calculator], system_prompt="You are a helpful voice assistant. Keep your responses brief and natural. Say hello when you first connect." ) From 4679e0c803d0e7b6fad7d32ef0866309fd8b55e4 Mon Sep 17 00:00:00 2001 From: Rachit Mehta Date: Mon, 20 Oct 2025 10:03:11 -0400 Subject: [PATCH 3/3] (feat)bidirectional_streaming: add openai realtime model provider #3 --- .../models/novasonic.py | 14 ++-- .../bidirectional_streaming/models/openai.py | 64 +++++++++---------- 2 files changed, 37 insertions(+), 41 deletions(-) diff --git a/src/strands/experimental/bidirectional_streaming/models/novasonic.py b/src/strands/experimental/bidirectional_streaming/models/novasonic.py index 4e4952fa9..db21fb967 100644 --- a/src/strands/experimental/bidirectional_streaming/models/novasonic.py +++ b/src/strands/experimental/bidirectional_streaming/models/novasonic.py @@ -23,7 +23,7 @@ from aws_sdk_bedrock_runtime.client import BedrockRuntimeClient, InvokeModelWithBidirectionalStreamOperationInput from aws_sdk_bedrock_runtime.config import Config, HTTPAuthSchemeResolver, SigV4AuthScheme -from aws_sdk_bedrock_runtime.models import BidirectionalInputPayloadPart, InvokeModelWithBidirectionalStreamInputChunk +from aws_sdk_bedrock_runtime.models import BidirectionalInputPayloadPart, InvokeModelWithBidirectionalStreamInputChunk, InvokeModelWithBidirectionalStreamOperationOutput from smithy_aws_core.identity.environment import EnvironmentCredentialsResolver from ....types.content import Messages @@ -80,11 +80,11 @@ class NovaSonicSession(BidirectionalModelSession): interface. """ - def __init__(self, stream: any, config: dict[str, any]) -> None: + def __init__(self, stream: InvokeModelWithBidirectionalStreamOperationOutput, config: dict[str, any]) -> None: """Initialize Nova Sonic connection. Args: - stream: Nova Sonic bidirectional stream. + stream: Nova Sonic bidirectional stream operation output from AWS SDK. config: Model configuration. """ self.stream = stream @@ -492,10 +492,10 @@ def _convert_nova_event(self, nova_event: dict[str, any]) -> dict[str, any] | No elif "usageEvent" in nova_event: usage_data = nova_event["usageEvent"] usage_metrics: UsageMetricsEvent = { - "totalTokens": usage_data.get("totalTokens"), - "inputTokens": usage_data.get("totalInputTokens"), - "outputTokens": usage_data.get("totalOutputTokens"), - "audioTokens": usage_data.get("details", {}).get("total", {}).get("output", {}).get("speechTokens") + "totalTokens": usage_data.get("totalTokens", 0), + "inputTokens": usage_data.get("totalInputTokens", 0), + "outputTokens": usage_data.get("totalOutputTokens", 0), + "audioTokens": usage_data.get("details", {}).get("total", {}).get("output", {}).get("speechTokens", 0) } return {"usageMetrics": usage_metrics} diff --git a/src/strands/experimental/bidirectional_streaming/models/openai.py b/src/strands/experimental/bidirectional_streaming/models/openai.py index 76bf9f50d..7d009b1c7 100644 --- a/src/strands/experimental/bidirectional_streaming/models/openai.py +++ b/src/strands/experimental/bidirectional_streaming/models/openai.py @@ -89,10 +89,7 @@ def _create_voice_activity_event(self, activity_type: str) -> dict[str, any]: voice_activity: VoiceActivityEvent = {"activityType": activity_type} return {"voiceActivity": voice_activity} - async def _create_conversation_item(self, item_data: dict) -> None: - """Create conversation item and trigger response.""" - await self._send_event({"type": "conversation.item.create", "item": item_data}) - await self._send_event({"type": "response.create"}) + async def initialize( self, @@ -248,21 +245,16 @@ def _convert_openai_event(self, openai_event: dict[str, any]) -> dict[str, any] } return {"audioOutput": audio_output} - # Text output using helper method - elif event_type == "response.output_text.delta": + # Assistant text output events - combine multiple similar events + elif event_type in ["response.output_text.delta", "response.output_audio_transcript.delta"]: return self._create_text_event(openai_event["delta"], "assistant") - elif event_type == "response.output_audio_transcript.delta": - return self._create_text_event(openai_event["delta"], "assistant") - - # User transcription - elif event_type == "conversation.item.input_audio_transcription.delta": - transcript_delta = openai_event.get("delta", "") - return self._create_text_event(transcript_delta, "user") if transcript_delta.strip() else None - - elif event_type == "conversation.item.input_audio_transcription.completed": - transcript = openai_event.get("transcript", "") - return self._create_text_event(transcript, "user") if transcript.strip() else None + # User transcription events - combine multiple similar events + elif event_type in ["conversation.item.input_audio_transcription.delta", + "conversation.item.input_audio_transcription.completed"]: + text_key = "delta" if "delta" in event_type else "transcript" + text = openai_event.get(text_key, "") + return self._create_text_event(text, "user") if text.strip() else None elif event_type == "conversation.item.input_audio_transcription.segment": segment_data = openai_event.get("segment", {}) @@ -302,22 +294,22 @@ def _convert_openai_event(self, openai_event: dict[str, any]) -> dict[str, any] del self._function_call_buffer[call_id] return None - # Voice activity detection using helper method - elif event_type == "input_audio_buffer.speech_started": - return self._create_voice_activity_event("speech_started") - elif event_type == "input_audio_buffer.speech_stopped": - return self._create_voice_activity_event("speech_stopped") - elif event_type == "input_audio_buffer.timeout_triggered": - return self._create_voice_activity_event("timeout") + # Voice activity detection events - combine similar events using mapping + elif event_type in ["input_audio_buffer.speech_started", "input_audio_buffer.speech_stopped", + "input_audio_buffer.timeout_triggered"]: + # Map event types to activity types + activity_map = { + "input_audio_buffer.speech_started": "speech_started", + "input_audio_buffer.speech_stopped": "speech_stopped", + "input_audio_buffer.timeout_triggered": "timeout" + } + return self._create_voice_activity_event(activity_map[event_type]) - # Lifecycle events (log only) - elif event_type == "conversation.item.retrieve": + # Lifecycle events (log only) - combine multiple similar events + elif event_type in ["conversation.item.retrieve", "conversation.item.added"]: item = openai_event.get("item", {}) - logger.debug("OpenAI conversation item retrieved: %s", item.get("id")) - return None - - elif event_type == "conversation.item.added": - logger.debug("OpenAI conversation item added: %s", openai_event.get("item", {}).get("id")) + action = "retrieved" if "retrieve" in event_type else "added" + logger.debug("OpenAI conversation item %s: %s", action, item.get("id")) return None elif event_type == "conversation.item.done": @@ -341,6 +333,7 @@ def _convert_openai_event(self, openai_event: dict[str, any]) -> dict[str, any] return {"messageStop": {"message": message}} return None + # Response output events - combine similar events elif event_type in ["response.output_item.added", "response.output_item.done", "response.content_part.added", "response.content_part.done"]: item_data = openai_event.get("item") or openai_event.get("part") @@ -359,6 +352,7 @@ def _convert_openai_event(self, openai_event: dict[str, any]) -> dict[str, any] self._function_call_buffer[call_id]["name"] = function_name return None + # Session/buffer events - combine simple log-only events elif event_type in ["input_audio_buffer.committed", "input_audio_buffer.cleared", "session.created", "session.updated"]: logger.debug("OpenAI %s event", event_type) @@ -380,7 +374,7 @@ async def send_audio_content(self, audio_input: AudioInputEvent) -> None: audio_base64 = base64.b64encode(audio_input["audioData"]).decode("utf-8") await self._send_event({"type": "input_audio_buffer.append", "audio": audio_base64}) - async def send_text_content(self, text: str, **kwargs) -> None: + async def send_text_content(self, text: str) -> None: """Send text content to OpenAI for processing.""" if not self._require_active(): return @@ -390,7 +384,8 @@ async def send_text_content(self, text: str, **kwargs) -> None: "role": "user", "content": [{"type": "input_text", "text": text}] } - await self._create_conversation_item(item_data) + await self._send_event({"type": "conversation.item.create", "item": item_data}) + await self._send_event({"type": "response.create"}) async def send_interrupt(self) -> None: """Send interruption signal to OpenAI.""" @@ -412,7 +407,8 @@ async def send_tool_result(self, tool_use_id: str, result: dict[str, any]) -> No "call_id": tool_use_id, "output": result_text } - await self._create_conversation_item(item_data) + await self._send_event({"type": "conversation.item.create", "item": item_data}) + await self._send_event({"type": "response.create"}) async def close(self) -> None: """Close session and cleanup resources."""