From dfcf3acf8ddf82a88c686469e50ed2f8b8f523ec Mon Sep 17 00:00:00 2001 From: Lana Zhang Date: Tue, 9 Dec 2025 10:50:42 -0500 Subject: [PATCH 01/23] agentcore runtime bidi streaming add strands sample --- .../06-bi-directional-streaming/README.md | 142 +++- .../strands/client/client.py | 327 +++++++++ .../strands/client/requirements.txt | 2 + .../strands/client/strands-client.html | 691 ++++++++++++++++++ .../strands/websocket/Dockerfile | 35 + .../strands/websocket/requirements.txt | 9 + .../strands/websocket/server.py | 243 ++++++ 7 files changed, 1446 insertions(+), 3 deletions(-) create mode 100755 01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/client.py create mode 100644 01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/requirements.txt create mode 100644 01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/strands-client.html create mode 100644 01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/Dockerfile create mode 100644 01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/requirements.txt create mode 100644 01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/server.py diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md index 71965ee8d..1651eadd0 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md @@ -4,6 +4,8 @@ This repository contains sample implementations demonstrating bidirectional WebS - **Sonic** - Native Amazon Nova Sonic Python WebSocket implementation deployed directly to AgentCore. Provides full control over the Nova Sonic protocol with direct event handling. Includes a web client for testing real-time audio conversations with voice selection and interruption support. +- **Strands** - High-level framework implementation using the Strands BidiAgent for simplified real-time audio conversations. Built on top of Nova Sonic with automatic session management, tool integration, and a streamlined API. Perfect for rapid prototyping and production applications that benefit from framework abstractions. + - **Echo** - Simple echo server for testing WebSocket connectivity and authentication without AI features. All samples use a unified setup and cleanup process through the root `setup.sh` and `cleanup.sh` scripts. @@ -39,6 +41,17 @@ export IAM_ROLE_NAME=WebSocketSonicAgentRole export ECR_REPO_NAME=agentcore_sonic_images export AGENT_NAME=websocket_sonic_agent +# AWS Authentication (choose one method): + +# Method 1: Using AWS Profile (recommended) +# Set AWS_PROFILE environment variable OR ensure your default profile has proper access +export AWS_PROFILE=your_profile_name + +# Method 2: Using AWS credentials directly +# export AWS_ACCESS_KEY_ID=your_access_key +# export AWS_SECRET_ACCESS_KEY=your_secret_key +# export AWS_SESSION_TOKEN=your_session_token # Optional, for temporary credentials + # Run setup ./setup.sh sonic ``` @@ -55,6 +68,14 @@ export AGENT_NAME=websocket_sonic_agent # Export environment variables (from setup output) export AWS_REGION="us-east-1" +# AWS Authentication (choose one method): +# Set AWS_PROFILE environment variable OR ensure your default profile has proper access +export AWS_PROFILE=your_profile_name +# OR +# export AWS_ACCESS_KEY_ID=your_access_key +# export AWS_SECRET_ACCESS_KEY=your_secret_key +# export AWS_SESSION_TOKEN=your_session_token # Optional + # Start the web client python sonic/client/client.py --runtime-arn "" ``` @@ -74,7 +95,6 @@ The web client will: - **Web-based UI** - No installation required, works in any modern browser - **Session management** - Automatic session handling and audio buffering - **Event logging** - See all WebSocket events in real-time with filtering capability - ### Sample Tool: getDateTool The Sonic implementation includes a working example of tool integration. The `getDateTool` demonstrates how to: @@ -93,6 +113,93 @@ The Sonic implementation includes a working example of tool integration. The `ge --- +## Strands Sample - Framework-Based Implementation + +This sample demonstrates using the **Strands BidiAgent framework** for real-time audio conversations with Amazon Nova Sonic. Strands provides a high-level abstraction that simplifies bidirectional streaming, automatic session management, and tool integration. + +**Architecture:** + +The Strands implementation uses the BidiAgent framework to handle the complexity of WebSocket communication, audio streaming, and tool orchestration automatically. + +**Best for:** Rapid prototyping and production applications that benefit from framework abstractions while maintaining full Nova Sonic capabilities. + +### Setup + +```bash +# Required +export ACCOUNT_ID=your_aws_account_id + +# Optional - customize these or use defaults +export AWS_REGION=us-east-1 +export IAM_ROLE_NAME=WebSocketStrandsAgentRole +export ECR_REPO_NAME=agentcore_strands_images +export AGENT_NAME=websocket_strands_agent + +# AWS Authentication (choose one method): + +# Method 1: Using AWS Profile (recommended) +# Set AWS_PROFILE environment variable OR ensure your default profile has proper access +export AWS_PROFILE=your_profile_name + +# Method 2: Using AWS credentials directly +# export AWS_ACCESS_KEY_ID=your_access_key +# export AWS_SECRET_ACCESS_KEY=your_secret_key +# export AWS_SESSION_TOKEN=your_session_token # Optional, for temporary credentials + +# Run setup +./setup.sh strands +``` + +### Run the Client + +**Option 1: Using the start script (recommended)** +```bash +./start_client.sh strands +``` + +**Option 2: Manual start** +```bash +# Export environment variables (from setup output) +export AWS_REGION="us-east-1" + +# AWS Authentication (choose one method): +# Set AWS_PROFILE environment variable OR ensure your default profile has proper access +export AWS_PROFILE=your_profile_name +# OR +# export AWS_ACCESS_KEY_ID=your_access_key +# export AWS_SECRET_ACCESS_KEY=your_secret_key +# export AWS_SESSION_TOKEN=your_session_token # Optional + +# Start the web client +python strands/client/client.py --runtime-arn "" +``` + +The web client will: +1. Open automatically in your browser +2. Request microphone access +3. Enable real-time audio conversation with the AI + +### Sample Tool: Calculator + +The Strands implementation includes a calculator tool that demonstrates framework-based tool integration. The tool can perform basic arithmetic operations. + +**Try it:** Ask questions like "What is 25 times 4?" or "Calculate 100 divided by 5" and the assistant will use the calculator tool. + +### Key Differences from Sonic Sample + +- **Abstraction level:** Strands provides higher-level APIs vs. Sonic's direct protocol control +- **Code complexity:** Strands requires less boilerplate for session management +- **Tool integration:** Framework handles tool orchestration automatically +- **Flexibility:** Sonic offers more fine-grained control over events and responses + +### Cleanup + +```bash +./cleanup.sh strands +``` + +--- + ## Echo Sample - WebSocket Testing A simple echo server for testing WebSocket connectivity and authentication. @@ -109,6 +216,17 @@ export IAM_ROLE_NAME=WebSocketEchoAgentRole export DOCKER_REPO_NAME=agentcore_echo_images export AGENT_NAME=websocket_echo_agent +# AWS Authentication (choose one method): + +# Method 1: Using AWS Profile (recommended) +# Set AWS_PROFILE environment variable OR ensure your default profile has proper access +export AWS_PROFILE=your_profile_name + +# Method 2: Using AWS credentials directly +# export AWS_ACCESS_KEY_ID=your_access_key +# export AWS_SECRET_ACCESS_KEY=your_secret_key +# export AWS_SESSION_TOKEN=your_session_token # Optional, for temporary credentials + # Run setup ./setup.sh echo ``` @@ -125,13 +243,20 @@ export AGENT_NAME=websocket_echo_agent # Export environment variables (from setup output) export AWS_REGION="us-east-1" +# AWS Authentication (choose one method): +# Set AWS_PROFILE environment variable OR ensure your default profile has proper access +export AWS_PROFILE=your_profile_name +# OR +# export AWS_ACCESS_KEY_ID=your_access_key +# export AWS_SECRET_ACCESS_KEY=your_secret_key +# export AWS_SESSION_TOKEN=your_session_token # Optional + # Test with SigV4 headers authentication python echo/client/client.py --runtime-arn "" --auth-type headers # Test with SigV4 query parameters python echo/client/client.py --runtime-arn "" --auth-type query ``` - ### Features - **Simple echo** - Sends a message and verifies the echo response @@ -183,7 +308,7 @@ After deployment, you'll have an ECR repository, IAM role, running agent runtime ├── agent_role.json # IAM role policy template ├── trust_policy.json # IAM trust policy │ -├── sonic/ # Sonic sample (real-time audio conversations) +├── sonic/ # Sonic sample (native implementation) │ ├── client/ # Web-based client │ │ ├── sonic-client.html # HTML UI with voice selection │ │ ├── client.py # Web server @@ -196,6 +321,17 @@ After deployment, you'll have an ECR repository, IAM role, running agent runtime │ │ └── requirements.txt # Server dependencies │ └── setup_config.json # Generated by setup.sh │ +├── strands/ # Strands sample (framework-based) +│ ├── client/ # Web-based client +│ │ ├── strands-client.html # HTML UI +│ │ ├── client.py # Web server +│ │ └── requirements.txt # Client dependencies +│ ├── websocket/ # Server implementation +│ │ ├── server.py # Strands BidiAgent server +│ │ ├── Dockerfile # Container definition +│ │ └── requirements.txt # Server dependencies +│ └── setup_config.json # Generated by setup.sh +│ └── echo/ # Echo sample (testing) ├── client/ # CLI client │ └── client.py # Echo test client diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/client.py b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/client.py new file mode 100755 index 000000000..c0d6a4dfc --- /dev/null +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/client.py @@ -0,0 +1,327 @@ +#!/usr/bin/env python3 +import argparse +import os +import sys +import webbrowser +import json +import random +import string +from http.server import HTTPServer, BaseHTTPRequestHandler +from urllib.parse import urlparse + +# Import from root-level websocket_helpers +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) +from websocket_helpers import create_presigned_url + + +class StrandsClientHandler(BaseHTTPRequestHandler): + """HTTP request handler that serves the Strands client""" + + # Class variables to store connection details + websocket_url = None + session_id = None + is_presigned = False + + # Store config for regenerating URLs + runtime_arn = None + region = None + service = None + expires = None + qualifier = None + + def log_message(self, format, *args): + """Override to provide cleaner logging""" + sys.stderr.write(f"[{self.log_date_time_string()}] {format % args}\n") + + def do_GET(self): + """Handle GET requests""" + parsed_path = urlparse(self.path) + + if parsed_path.path == "/" or parsed_path.path == "/index.html": + self.serve_client_page() + elif parsed_path.path == "/api/connection": + self.serve_connection_info() + else: + self.send_error(404, "File not found") + + def do_POST(self): + """Handle POST requests""" + parsed_path = urlparse(self.path) + + if parsed_path.path == "/api/regenerate": + self.regenerate_url() + else: + self.send_error(404, "Endpoint not found") + + def serve_client_page(self): + """Serve the HTML client with pre-configured connection""" + try: + # Read the HTML template + html_path = os.path.join(os.path.dirname(__file__), "strands-client.html") + with open(html_path, "r") as f: + html_content = f.read() + + # Inject the WebSocket URL if provided + if self.websocket_url: + html_content = html_content.replace( + 'id="presignedUrl" placeholder="wss://endpoint/runtimes/arn/ws?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=...&X-Amz-Signature=..."', + f'id="presignedUrl" placeholder="wss://endpoint/runtimes/arn/ws?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=...&X-Amz-Signature=..." value="{self.websocket_url}"', + ) + + self.send_response(200) + self.send_header("Content-type", "text/html") + self.send_header("Content-Length", len(html_content.encode())) + self.end_headers() + self.wfile.write(html_content.encode()) + + except FileNotFoundError: + self.send_error(404, "strands-client.html not found") + except Exception as e: + self.send_error(500, f"Internal server error: {str(e)}") + + def serve_connection_info(self): + """Serve the connection information as JSON""" + response = { + "websocket_url": self.websocket_url or "", + "session_id": self.session_id, + "is_presigned": self.is_presigned, + "can_regenerate": self.runtime_arn is not None, + "status": "ok" if self.websocket_url else "no_connection", + } + + response_json = json.dumps(response, indent=2) + + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Content-Length", len(response_json.encode())) + self.end_headers() + self.wfile.write(response_json.encode()) + + def regenerate_url(self): + """Regenerate the presigned URL""" + try: + if not self.runtime_arn: + error_response = { + "status": "error", + "message": "Cannot regenerate URL - not using presigned URL mode", + } + response_json = json.dumps(error_response) + self.send_response(400) + self.send_header("Content-type", "application/json") + self.send_header("Content-Length", len(response_json.encode())) + self.end_headers() + self.wfile.write(response_json.encode()) + return + + # Generate new presigned URL + base_url = f"wss://bedrock-agentcore.{self.region}.amazonaws.com/runtimes/{self.runtime_arn}/ws?qualifier={self.qualifier}" + + new_url = create_presigned_url( + base_url, region=self.region, service=self.service, expires=self.expires + ) + + # Update the class variable + StrandsClientHandler.websocket_url = new_url + + response = { + "status": "ok", + "websocket_url": new_url, + "expires_in": self.expires, + "message": "URL regenerated successfully", + } + + response_json = json.dumps(response, indent=2) + + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Content-Length", len(response_json.encode())) + self.end_headers() + self.wfile.write(response_json.encode()) + + print(f"✅ Regenerated presigned URL (expires in {self.expires} seconds)") + + except Exception as e: + error_response = {"status": "error", "message": str(e)} + response_json = json.dumps(error_response) + self.send_response(500) + self.send_header("Content-type", "application/json") + self.send_header("Content-Length", len(response_json.encode())) + self.end_headers() + self.wfile.write(response_json.encode()) + + +def main(): + parser = argparse.ArgumentParser( + description="Start web service for Strands WebSocket client", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Local WebSocket server (no authentication) + python client.py --ws-url ws://localhost:8080/ws + + # AWS Bedrock with presigned URL + python client.py --runtime-arn arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/RUNTIMEID + + # Specify custom port + python client.py --runtime-arn arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/RUNTIMEID --port 8080 + + # Custom region + python client.py --runtime-arn arn:aws:bedrock-agentcore:us-east-1:123456789012:runtime/RUNTIMEID \\ + --region us-east-1 +""", + ) + + parser.add_argument( + "--runtime-arn", + help="Runtime ARN for AWS Bedrock connection (e.g., arn:aws:bedrock-agentcore:region:account:runtime/id)", + ) + + parser.add_argument( + "--ws-url", + help="WebSocket server URL for local connections (e.g., ws://localhost:8080/ws)", + ) + + parser.add_argument( + "--region", + default=os.getenv("AWS_REGION", "us-east-1"), + help="AWS region (default: us-east-1, from AWS_REGION env var)", + ) + + parser.add_argument( + "--service", + default="bedrock-agentcore", + help="AWS service name (default: bedrock-agentcore)", + ) + + parser.add_argument( + "--expires", + type=int, + default=3600, + help="URL expiration time in seconds for presigned URLs (default: 3600 = 1 hour)", + ) + + parser.add_argument( + "--qualifier", default="DEFAULT", help="Runtime qualifier (default: DEFAULT)" + ) + + parser.add_argument( + "--port", type=int, default=8000, help="Web server port (default: 8000)" + ) + + parser.add_argument( + "--no-browser", action="store_true", help="Do not automatically open browser" + ) + + args = parser.parse_args() + + # Validate arguments + if not args.runtime_arn and not args.ws_url: + parser.error("Either --runtime-arn or --ws-url must be specified") + + if args.runtime_arn and args.ws_url: + parser.error("Cannot specify both --runtime-arn and --ws-url") + + # Extract region from runtime ARN if provided + if args.runtime_arn: + arn_parts = args.runtime_arn.split(":") + if len(arn_parts) >= 4: + arn_region = arn_parts[3] + if arn_region and arn_region != args.region: + args.region = arn_region + + print("=" * 70) + print("🎙️ Strands Client Web Service") + print("=" * 70) + + websocket_url = None + session_id = "".join(random.choices(string.ascii_letters + string.digits, k=50)) + is_presigned = False + + try: + # Generate presigned URL for AWS Bedrock + if args.runtime_arn: + base_url = f"wss://bedrock-agentcore.{args.region}.amazonaws.com/runtimes/{args.runtime_arn}/ws?qualifier={args.qualifier}&voice_id=matthew" + default_voice_id = "matthew" + + print(f"📡 Base URL: {base_url}") + print(f"🔑 Runtime ARN: {args.runtime_arn}") + print(f"🌍 Region: {args.region}") + print(f"🎙️ Voice ID: {default_voice_id} (default)") + print(f"🆔 Session ID: {session_id}") + print( + f"⏰ URL expires in: {args.expires} seconds ({args.expires / 60:.1f} minutes)" + ) + print() + print("🔐 Generating pre-signed URL...") + + websocket_url = create_presigned_url( + base_url, region=args.region, service=args.service, expires=args.expires + ) + is_presigned = True + print("✅ Pre-signed URL generated successfully!") + + # Use provided WebSocket URL for local connections + else: + websocket_url = args.ws_url + print(f"🔗 WebSocket URL: {websocket_url}") + print("💡 Using local WebSocket connection (no authentication)") + + print(f"🌐 Web Server Port: {args.port}") + print() + + # Set connection details in the handler class + StrandsClientHandler.websocket_url = websocket_url + StrandsClientHandler.session_id = session_id + StrandsClientHandler.is_presigned = is_presigned + + # Store config for regenerating URLs + if args.runtime_arn: + StrandsClientHandler.runtime_arn = args.runtime_arn + StrandsClientHandler.region = args.region + StrandsClientHandler.service = args.service + StrandsClientHandler.expires = args.expires + StrandsClientHandler.qualifier = args.qualifier + + # Start web server + server_address = ("", args.port) + httpd = HTTPServer(server_address, StrandsClientHandler) + + server_url = f"http://localhost:{args.port}" + + print("=" * 70) + print("🌐 Web Server Started") + print("=" * 70) + print(f"📍 Server URL: {server_url}") + print(f"🔗 Client Page: {server_url}/") + print(f"📊 API Endpoint: {server_url}/api/connection") + print() + if is_presigned: + print("💡 The pre-signed WebSocket URL is pre-populated in the client") + else: + print("💡 The WebSocket URL is pre-populated in the client") + print("💡 Press Ctrl+C to stop the server") + print("=" * 70) + print() + + # Open browser automatically + if not args.no_browser: + print("🌐 Opening browser...") + webbrowser.open(server_url) + print() + + # Start serving + httpd.serve_forever() + + except KeyboardInterrupt: + print("\n\n👋 Shutting down server...") + return 0 + except Exception as e: + print(f"\n❌ Error: {e}", file=sys.stderr) + import traceback + + traceback.print_exc(file=sys.stderr) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/requirements.txt b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/requirements.txt new file mode 100644 index 000000000..31b6f152c --- /dev/null +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/requirements.txt @@ -0,0 +1,2 @@ +boto3==1.42.3 +botocore==1.42.3 \ No newline at end of file diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/strands-client.html b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/strands-client.html new file mode 100644 index 000000000..c172aceb8 --- /dev/null +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/client/strands-client.html @@ -0,0 +1,691 @@ + + + + + + BidiAgent WebSocket + + + +
+
+
+

🎙️ Strands BidiAgent WebSocket Client - with Bedrock AgentCore

+
+
Disconnected
+
+ +
+
+
+ + + + Paste the complete pre-signed URL with SigV4 authentication parameters in the query string + +
+
+ +
+ +
+ +
+
+
+
+
+
+
+
+
+
+ + + \ No newline at end of file diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/Dockerfile b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/Dockerfile new file mode 100644 index 000000000..0870e76ae --- /dev/null +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/Dockerfile @@ -0,0 +1,35 @@ +FROM --platform=linux/arm64 public.ecr.aws/docker/library/python:3.12-slim + +WORKDIR /app + +RUN pip install --upgrade pip + +# Install system dependencies for PyAudio and other packages +RUN apt-get update && apt-get install -y \ + portaudio19-dev \ + gcc \ + g++ \ + make \ + libasound2-dev \ + git \ + && rm -rf /var/lib/apt/lists/* + +# Set a fallback version for setuptools-scm (in case git metadata is missing) +ENV SETUPTOOLS_SCM_PRETEND_VERSION=0.1.7 + +# Copy requirements and install remaining dependencies +COPY requirements.txt ./ +RUN pip install -r requirements.txt + +# Copy agent file +COPY server.py ./ + +# Set environment variables +ENV HOST="0.0.0.0" +ENV PORT=8080 + +# Expose port +EXPOSE 8080 + +# Run application directly +CMD ["python", "server.py"] diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/requirements.txt b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/requirements.txt new file mode 100644 index 000000000..f68802f5d --- /dev/null +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/requirements.txt @@ -0,0 +1,9 @@ +uvicorn[standard]==0.30.6 +fastapi==0.123.9 +websockets==13.1 +strands-agents==1.19.0 +strands-agents-tools==0.2.17 +requests==2.32.5 +aws_sdk_bedrock_runtime==0.2.0 +pyaudio==0.2.14 +prompt_toolkit==3.0.52 \ No newline at end of file diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/server.py b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/server.py new file mode 100644 index 000000000..c439ce9a7 --- /dev/null +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/strands/websocket/server.py @@ -0,0 +1,243 @@ +import logging +import uvicorn +import os +import asyncio +import requests +from datetime import datetime +from fastapi import FastAPI, WebSocket, WebSocketDisconnect +from fastapi.responses import JSONResponse +from fastapi.middleware.cors import CORSMiddleware + +from strands.experimental.bidi.agent import BidiAgent +from strands.experimental.bidi.models.nova_sonic import BidiNovaSonicModel +from strands_tools import calculator + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +_credential_refresh_task = None + + +def get_imdsv2_token(): + """Get IMDSv2 token for secure metadata access.""" + try: + response = requests.put( + "http://169.254.169.254/latest/api/token", + headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}, + timeout=2, + ) + if response.status_code == 200: + return response.text + except Exception: + pass + return None + + +def get_credentials_from_imds(): + """Retrieve IAM role credentials from EC2 IMDS (tries IMDSv2 first, falls back to IMDSv1).""" + result = { + "success": False, + "credentials": None, + "role_name": None, + "method_used": None, + "error": None, + } + + try: + token = get_imdsv2_token() + headers = {"X-aws-ec2-metadata-token": token} if token else {} + result["method_used"] = "IMDSv2" if token else "IMDSv1" + + role_response = requests.get( + "http://169.254.169.254/latest/meta-data/iam/security-credentials/", + headers=headers, + timeout=2, + ) + + if role_response.status_code != 200: + result["error"] = ( + f"Failed to retrieve IAM role: HTTP {role_response.status_code}" + ) + return result + + role_name = role_response.text.strip() + result["role_name"] = role_name + + creds_response = requests.get( + f"http://169.254.169.254/latest/meta-data/iam/security-credentials/{role_name}", + headers=headers, + timeout=2, + ) + + if creds_response.status_code != 200: + result["error"] = ( + f"Failed to retrieve credentials: HTTP {creds_response.status_code}" + ) + return result + + credentials = creds_response.json() + result["success"] = True + result["credentials"] = { + "AccessKeyId": credentials.get("AccessKeyId"), + "SecretAccessKey": credentials.get("SecretAccessKey"), + "Token": credentials.get("Token"), + "Expiration": credentials.get("Expiration"), + } + + except Exception as e: + result["error"] = str(e) + + return result + + +async def refresh_credentials_from_imds(): + """Background task to refresh credentials from IMDS.""" + logger.info("Starting credential refresh task") + + while True: + try: + imds_result = get_credentials_from_imds() + + if imds_result["success"]: + creds = imds_result["credentials"] + + os.environ["AWS_ACCESS_KEY_ID"] = creds["AccessKeyId"] + os.environ["AWS_SECRET_ACCESS_KEY"] = creds["SecretAccessKey"] + os.environ["AWS_SESSION_TOKEN"] = creds["Token"] + + logger.info(f"✅ Credentials refreshed ({imds_result['method_used']})") + + try: + expiration = datetime.fromisoformat( + creds["Expiration"].replace("Z", "+00:00") + ) + now = datetime.now(expiration.tzinfo) + time_until_expiration = (expiration - now).total_seconds() + refresh_interval = min(max(time_until_expiration - 300, 60), 3600) + logger.info(f" Next refresh in {refresh_interval:.0f}s") + except Exception: + refresh_interval = 3600 + + await asyncio.sleep(refresh_interval) + else: + logger.error(f"Failed to refresh credentials: {imds_result['error']}") + await asyncio.sleep(300) + + except asyncio.CancelledError: + logger.info("Credential refresh task cancelled") + break + except Exception as e: + logger.error(f"Error in credential refresh: {e}") + await asyncio.sleep(300) + + +app = FastAPI(title="Strands BidiAgent WebSocket Server") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +@app.on_event("startup") +async def startup_event(): + global _credential_refresh_task + + logger.info("🚀 Starting server...") + logger.info(f"📍 Region: {os.getenv('AWS_DEFAULT_REGION', 'us-east-1')}") + + if os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY"): + logger.info("✅ Using credentials from environment (local mode)") + else: + logger.info("🔄 Fetching credentials from EC2 IMDS...") + imds_result = get_credentials_from_imds() + + if imds_result["success"]: + creds = imds_result["credentials"] + os.environ["AWS_ACCESS_KEY_ID"] = creds["AccessKeyId"] + os.environ["AWS_SECRET_ACCESS_KEY"] = creds["SecretAccessKey"] + os.environ["AWS_SESSION_TOKEN"] = creds["Token"] + + logger.info(f"✅ Credentials loaded ({imds_result['method_used']})") + + _credential_refresh_task = asyncio.create_task( + refresh_credentials_from_imds() + ) + logger.info("🔄 Credential refresh task started") + else: + logger.error(f"❌ Failed to fetch credentials: {imds_result['error']}") + + +@app.on_event("shutdown") +async def shutdown_event(): + global _credential_refresh_task + + logger.info("🛑 Shutting down...") + + if _credential_refresh_task and not _credential_refresh_task.done(): + _credential_refresh_task.cancel() + try: + await _credential_refresh_task + except asyncio.CancelledError: + pass + + +@app.get("/ping") +async def ping(): + return JSONResponse({"status": "ok"}) + + +@app.get("/health") +async def health_check(): + return JSONResponse({"status": "healthy"}) + + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + + voice_id = websocket.query_params.get("voice_id", "matthew") + logger.info(f"Connection from {websocket.client}, voice: {voice_id}") + + try: + model = BidiNovaSonicModel( + region="us-east-1", + model_id="amazon.nova-sonic-v1:0", + provider_config={ + "audio": { + "input_sample_rate": 16000, + "output_sample_rate": 16000, + "voice": voice_id, + } + }, + tools=[calculator], + ) + + agent = BidiAgent( + model=model, + tools=[calculator], + system_prompt="You are a helpful assistant with access to a calculator tool.", + ) + + await agent.run(inputs=[websocket.receive_json], outputs=[websocket.send_json]) + + except WebSocketDisconnect: + logger.info("Client disconnected") + except Exception as e: + logger.error(f"Error: {e}") + try: + await websocket.send_json({"type": "error", "message": str(e)}) + except Exception: + pass + finally: + logger.info("Connection closed") + + +if __name__ == "__main__": + host = os.getenv("HOST", "0.0.0.0") + port = int(os.getenv("PORT", "8080")) + + uvicorn.run(app, host=host, port=port) From 571269492e9556fb0e51654a68c35e8cd61af28e Mon Sep 17 00:00:00 2001 From: Lana Zhang Date: Tue, 9 Dec 2025 22:48:09 -0500 Subject: [PATCH 02/23] agentcore runtime bidistream sample update for Nova Sonic 2 --- .../06-bi-directional-streaming/README.md | 4 +- .../sonic/client/sonic-client.html | 90 +++++++++++++++++++ .../sonic/websocket/server.py | 2 +- 3 files changed, 93 insertions(+), 3 deletions(-) diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md index 1651eadd0..f64af34db 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md @@ -19,9 +19,9 @@ All samples use a unified setup and cleanup process through the root `setup.sh` --- -## Sonic Sample - Native Nova Sonic Implementation +## Sonic Sample - Native Nova Sonic 2 Implementation -This sample deploys a **native Amazon Nova Sonic Python WebSocket server** directly to AgentCore. It provides full control over the Nova Sonic protocol with direct event handling, giving you complete visibility into session management, audio streaming, and response generation. +This sample deploys a **native Amazon Nova Sonic 2 Python WebSocket server** directly to AgentCore. It provides full control over the Nova Sonic protocol with direct event handling, giving you complete visibility into session management, audio streaming, and response generation. **Architecture:** diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/client/sonic-client.html b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/client/sonic-client.html index 4ab375c71..f03f4c5bd 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/client/sonic-client.html +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/client/sonic-client.html @@ -426,6 +426,13 @@ background: #667eea; border-radius: 2px; } + + .text-input-section { + padding: 10px; + border-top: 1px solid #e0e0e0; + background: #f8f9fa; + flex-shrink: 0; + } @@ -483,6 +490,12 @@

🎙️ Nova Sonic S2S WebSocket Client - with Bedrock AgentCore

💬 Conversation

+
+
+ + +
+
@@ -567,6 +580,8 @@

📡 Events

function updateStatus(connected, recording = false) { const status = document.getElementById('status'); const toggleBtn = document.getElementById('toggleBtn'); + const textInput = document.getElementById('textInput'); + const sendBtn = document.getElementById('sendBtn'); if (connected) { status.textContent = recording ? '🔴 Recording' : '🟢 Connected'; @@ -574,12 +589,16 @@

📡 Events

toggleBtn.textContent = '🛑 End Conversation'; toggleBtn.className = 'connected'; toggleBtn.disabled = false; + textInput.disabled = false; + sendBtn.disabled = false; } else { status.textContent = '⚫ Disconnected'; status.className = 'disconnected'; toggleBtn.textContent = '🚀 Start Conversation'; toggleBtn.className = ''; toggleBtn.disabled = false; + textInput.disabled = true; + sendBtn.disabled = true; } } @@ -1140,6 +1159,77 @@

📡 Events

addMessage('⏹️ Session stopped', 'system'); } + function sendTextMessage() { + const textInput = document.getElementById('textInput'); + const message = textInput.value.trim(); + + if (!message || !ws || ws.readyState !== WebSocket.OPEN || !sessionStarted) { + return; + } + + // Generate a new content name for this text message + const newTextContentName = 'text_' + generateId(); + + // Send contentStart event + const contentStart = { + event: { + contentStart: { + promptName: promptName, + contentName: newTextContentName, + role: "USER", + type: "TEXT", + interactive: true, + textInputConfiguration: { + mediaType: "text/plain" + } + } + } + }; + ws.send(JSON.stringify(contentStart)); + addEvent('sent', 'contentStart (TEXT)', contentStart); + + // Send textInput event + const textInputEvent = { + event: { + textInput: { + promptName: promptName, + contentName: newTextContentName, + content: message + } + } + }; + ws.send(JSON.stringify(textInputEvent)); + addEvent('sent', 'textInput', textInputEvent); + + // Send contentEnd event + const contentEnd = { + event: { + contentEnd: { + promptName: promptName, + contentName: newTextContentName + } + } + }; + ws.send(JSON.stringify(contentEnd)); + addEvent('sent', 'contentEnd', contentEnd); + + // Add message to conversation + addMessage(`💬 You: ${message}`, 'user'); + + // Clear input + textInput.value = ''; + } + + // Add Enter key handler for text input + document.addEventListener('DOMContentLoaded', function() { + const textInput = document.getElementById('textInput'); + textInput.addEventListener('keypress', function(e) { + if (e.key === 'Enter') { + sendTextMessage(); + } + }); + }); + diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py index aa9df560d..0f446ffcf 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py @@ -313,7 +313,7 @@ async def websocket_endpoint(websocket: WebSocket): # Create a new stream manager for this connection stream_manager = S2sSessionManager( - model_id='amazon.nova-sonic-v1:0', + model_id='amazon.nova-2-sonic-v1:0', region=aws_region ) From 740fae42ea10aa36d8c88d253ff62d456ac25088 Mon Sep 17 00:00:00 2001 From: Lana Zhang Date: Tue, 9 Dec 2025 23:05:38 -0500 Subject: [PATCH 03/23] agentcore bidi streaming sonic 2 update cleanup python file --- .../sonic/websocket/server.py | 371 ++++++++++-------- 1 file changed, 218 insertions(+), 153 deletions(-) diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py index 0f446ffcf..072a4be2c 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py @@ -24,7 +24,7 @@ def get_imdsv2_token(): """ Get IMDSv2 token for secure metadata access. - + Returns: str: The IMDSv2 token, or None if IMDSv2 is not available """ @@ -32,7 +32,7 @@ def get_imdsv2_token(): response = requests.put( "http://169.254.169.254/latest/api/token", headers={"X-aws-ec2-metadata-token-ttl-seconds": "21600"}, - timeout=2 + timeout=2, ) if response.status_code == 200: return response.text @@ -44,10 +44,10 @@ def get_imdsv2_token(): def get_credentials_from_imds(): """ Manually retrieve IAM role credentials from EC2 Instance Metadata Service. - + This utility method fetches credentials directly from IMDS without using boto3. It tries both IMDSv1 and IMDSv2 methods. - + Returns: dict: A dictionary containing the credentials or error information """ @@ -56,48 +56,52 @@ def get_credentials_from_imds(): "credentials": None, "role_name": None, "method_used": None, - "error": None + "error": None, } - + try: # Try IMDSv2 first token = get_imdsv2_token() headers = {} - + if token: headers["X-aws-ec2-metadata-token"] = token result["method_used"] = "IMDSv2" else: result["method_used"] = "IMDSv1" - + # Get the IAM role name role_response = requests.get( "http://169.254.169.254/latest/meta-data/iam/security-credentials/", headers=headers, - timeout=2 + timeout=2, ) - + if role_response.status_code != 200: - result["error"] = f"Failed to retrieve IAM role name: HTTP {role_response.status_code}" + result["error"] = ( + f"Failed to retrieve IAM role name: HTTP {role_response.status_code}" + ) return result - + role_name = role_response.text.strip() result["role_name"] = role_name - + # Get the credentials for the role creds_response = requests.get( f"http://169.254.169.254/latest/meta-data/iam/security-credentials/{role_name}", headers=headers, - timeout=2 + timeout=2, ) - + if creds_response.status_code != 200: - result["error"] = f"Failed to retrieve credentials for role {role_name}: HTTP {creds_response.status_code}" + result["error"] = ( + f"Failed to retrieve credentials for role {role_name}: HTTP {creds_response.status_code}" + ) return result - + # Parse the credentials credentials = creds_response.json() - + result["success"] = True result["credentials"] = { "AccessKeyId": credentials.get("AccessKeyId"), @@ -106,14 +110,14 @@ def get_credentials_from_imds(): "Expiration": credentials.get("Expiration"), "Code": credentials.get("Code"), "Type": credentials.get("Type"), - "LastUpdated": credentials.get("LastUpdated") + "LastUpdated": credentials.get("LastUpdated"), } - + except RequestException as e: result["error"] = f"Request exception: {str(e)}" except Exception as e: result["error"] = f"Unexpected error: {str(e)}" - + return result @@ -123,43 +127,49 @@ async def refresh_credentials_from_imds(): This ensures the EnvironmentCredentialsResolver always has fresh credentials. """ logger.info("Starting credential refresh background task") - + while True: try: # Fetch credentials from IMDS imds_result = get_credentials_from_imds() - + if imds_result["success"]: creds = imds_result["credentials"] - + # Update environment variables os.environ["AWS_ACCESS_KEY_ID"] = creds["AccessKeyId"] os.environ["AWS_SECRET_ACCESS_KEY"] = creds["SecretAccessKey"] os.environ["AWS_SESSION_TOKEN"] = creds["Token"] - + logger.info("✅ Credentials refreshed from IMD.") - + # Parse expiration time and calculate refresh interval # Refresh 5 minutes before expiration try: - expiration = datetime.fromisoformat(creds['Expiration'].replace('Z', '+00:00')) + expiration = datetime.fromisoformat( + creds["Expiration"].replace("Z", "+00:00") + ) now = datetime.now(expiration.tzinfo) time_until_expiration = (expiration - now).total_seconds() - + # Refresh 5 minutes (300 seconds) before expiration, or in 1 hour if expiration is far away refresh_interval = min(max(time_until_expiration - 300, 60), 3600) logger.info(f" Next refresh in {refresh_interval:.0f} seconds") except Exception as e: - logger.warning(f"Could not parse expiration time, using default 1 hour refresh: {e}") + logger.warning( + f"Could not parse expiration time, using default 1 hour refresh: {e}" + ) refresh_interval = 3600 - + # Wait until next refresh await asyncio.sleep(refresh_interval) else: - logger.error(f"Failed to refresh credentials from IMDS: {imds_result['error']}") + logger.error( + f"Failed to refresh credentials from IMDS: {imds_result['error']}" + ) # Retry in 5 minutes on failure await asyncio.sleep(300) - + except asyncio.CancelledError: logger.info("Credential refresh task cancelled") break @@ -181,13 +191,14 @@ async def refresh_credentials_from_imds(): allow_headers=["*"], ) + @app.on_event("startup") async def startup_event(): global credential_refresh_task - + logger.info("🚀 Application starting up...") logger.info(f"📍 AWS Region: {os.getenv('AWS_DEFAULT_REGION', 'us-east-1')}") - + # Check if credentials are already in environment (local mode) if os.getenv("AWS_ACCESS_KEY_ID") and os.getenv("AWS_SECRET_ACCESS_KEY"): logger.info("✅ Using credentials from environment variables (local mode)") @@ -195,32 +206,39 @@ async def startup_event(): else: # Try to fetch credentials from IMDS and start refresh task logger.info("🔄 Attempting to fetch credentials from EC2 IMDS...") - + imds_result = get_credentials_from_imds() - + if imds_result["success"]: creds = imds_result["credentials"] - + # Set initial credentials in environment os.environ["AWS_ACCESS_KEY_ID"] = creds["AccessKeyId"] os.environ["AWS_SECRET_ACCESS_KEY"] = creds["SecretAccessKey"] os.environ["AWS_SESSION_TOKEN"] = creds["Token"] - + logger.info("✅ Initial credentials loaded from IMDS.") - + # Start background task to refresh credentials - credential_refresh_task = asyncio.create_task(refresh_credentials_from_imds()) + credential_refresh_task = asyncio.create_task( + refresh_credentials_from_imds() + ) logger.info("🔄 Credential refresh background task started") else: - logger.error(f"❌ Failed to fetch credentials from IMDS: {imds_result['error']}") - logger.error(" Application may not function correctly without credentials") + logger.error( + f"❌ Failed to fetch credentials from IMDS: {imds_result['error']}" + ) + logger.error( + " Application may not function correctly without credentials" + ) + @app.on_event("shutdown") async def shutdown_event(): global credential_refresh_task - + logger.info("🛑 Application shutting down...") - + # Cancel credential refresh task if running if credential_refresh_task and not credential_refresh_task.done(): logger.info("Stopping credential refresh task...") @@ -231,17 +249,20 @@ async def shutdown_event(): pass logger.info("Credential refresh task stopped") + @app.get("/health") @app.get("/") async def health_check(): logger.info("Health check request received") return JSONResponse({"status": "healthy"}) + @app.get("/ping") async def ping(): logger.debug("Ping endpoint called") return JSONResponse({"status": "ok"}) + @app.get("/credentials/info") async def credential_info(): """Get information about credential configuration (for debugging)""" @@ -254,52 +275,55 @@ async def credential_info(): credential_source = "EC2 IMDS (IMDSv2 preferred, falls back to IMDSv1)" mode = "ec2" note = "Credentials are automatically refreshed from IMDS by background task" - - return JSONResponse({ - "status": "ok", - "mode": mode, - "credential_source": credential_source, - "region": os.getenv("AWS_DEFAULT_REGION", "us-east-1"), - "note": note - }) + + return JSONResponse( + { + "status": "ok", + "mode": mode, + "credential_source": credential_source, + "region": os.getenv("AWS_DEFAULT_REGION", "us-east-1"), + "note": note, + } + ) + @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): logger.info(f"WebSocket connection attempt from: {websocket.client}") logger.debug(f"Headers: {websocket.headers}") - + # Accept the WebSocket connection await websocket.accept() logger.info("WebSocket connection accepted") - + aws_region = os.getenv("AWS_DEFAULT_REGION", "us-east-1") stream_manager = None forward_task = None - + try: # Main message processing loop while True: try: message = await websocket.receive_text() logger.debug("Received message from client") - + try: data = json.loads(message) - + # Handle wrapped body format - if 'body' in data: + if "body" in data: data = json.loads(data["body"]) - - if 'event' not in data: + + if "event" not in data: logger.warning("Received message without event field") continue - - event_type = list(data['event'].keys())[0] - + + event_type = list(data["event"].keys())[0] + # Handle session start - create new stream manager - if event_type == 'sessionStart': + if event_type == "sessionStart": logger.info("Starting new session") - + # Clean up existing session if any if stream_manager: logger.info("Cleaning up existing session") @@ -313,30 +337,31 @@ async def websocket_endpoint(websocket: WebSocket): # Create a new stream manager for this connection stream_manager = S2sSessionManager( - model_id='amazon.nova-2-sonic-v1:0', - region=aws_region + model_id="amazon.nova-2-sonic-v1:0", region=aws_region ) - + # Initialize the Bedrock stream await stream_manager.initialize_stream() logger.info("Stream initialized successfully") - + # Start a task to forward responses from Bedrock to the WebSocket forward_task = asyncio.create_task( forward_responses(websocket, stream_manager) ) - + # Now send the sessionStart event to Bedrock await stream_manager.send_raw_event(data) - logger.info(f"SessionStart event sent to Bedrock {json.dumps(data)}") - + logger.info( + f"SessionStart event sent to Bedrock {json.dumps(data)}" + ) + # Continue to next iteration to process next event continue # Handle session end - clean up resources - elif event_type == 'sessionEnd': + elif event_type == "sessionEnd": logger.info("Ending session") - + if stream_manager: await stream_manager.close() stream_manager = None @@ -347,65 +372,88 @@ async def websocket_endpoint(websocket: WebSocket): except asyncio.CancelledError: pass forward_task = None - + # Continue to next iteration continue # Process events if we have an active stream manager if stream_manager and stream_manager.is_active: # Store prompt name and content names if provided - if event_type == 'promptStart': - stream_manager.prompt_name = data['event']['promptStart']['promptName'] - elif event_type == 'contentStart' and data['event']['contentStart'].get('type') == 'AUDIO': - stream_manager.audio_content_name = data['event']['contentStart']['contentName'] - + if event_type == "promptStart": + stream_manager.prompt_name = data["event"]["promptStart"][ + "promptName" + ] + elif ( + event_type == "contentStart" + and data["event"]["contentStart"].get("type") == "AUDIO" + ): + stream_manager.audio_content_name = data["event"][ + "contentStart" + ]["contentName"] + # Handle audio input separately (queue-based processing) - if event_type == 'audioInput': - prompt_name = data['event']['audioInput']['promptName'] - content_name = data['event']['audioInput']['contentName'] - audio_base64 = data['event']['audioInput']['content'] - + if event_type == "audioInput": + prompt_name = data["event"]["audioInput"]["promptName"] + content_name = data["event"]["audioInput"]["contentName"] + audio_base64 = data["event"]["audioInput"]["content"] + # Add to the audio queue for async processing - stream_manager.add_audio_chunk(prompt_name, content_name, audio_base64) + stream_manager.add_audio_chunk( + prompt_name, content_name, audio_base64 + ) else: # Send other events directly to Bedrock await stream_manager.send_raw_event(data) - elif event_type not in ['sessionStart', 'sessionEnd']: - logger.warning(f"Received event {event_type} but no active stream manager") - + elif event_type not in ["sessionStart", "sessionEnd"]: + logger.warning( + f"Received event {event_type} but no active stream manager" + ) + except json.JSONDecodeError as e: logger.error(f"Invalid JSON received from WebSocket: {e}") try: - await websocket.send_json({"type": "error", "message": "Invalid JSON format"}) + await websocket.send_json( + {"type": "error", "message": "Invalid JSON format"} + ) except Exception: pass except Exception as exp: - logger.error(f"Error processing WebSocket message: {exp}", exc_info=True) + logger.error( + f"Error processing WebSocket message: {exp}", exc_info=True + ) try: - await websocket.send_json({"type": "error", "message": str(exp)}) + await websocket.send_json( + {"type": "error", "message": str(exp)} + ) except Exception: pass - + except WebSocketDisconnect as e: logger.info(f"WebSocket disconnected: {websocket.client}") - logger.info(f"Disconnect details: code={getattr(e, 'code', 'N/A')}, reason={getattr(e, 'reason', 'N/A')}") + logger.info( + f"Disconnect details: code={getattr(e, 'code', 'N/A')}, reason={getattr(e, 'reason', 'N/A')}" + ) if stream_manager and stream_manager.is_active: - logger.info("Bedrock stream was still active when WebSocket disconnected") + logger.info( + "Bedrock stream was still active when WebSocket disconnected" + ) break except Exception as e: logger.error(f"WebSocket error: {e}", exc_info=True) break - + except Exception as e: logger.error(f"WebSocket handler error: {e}", exc_info=True) try: - await websocket.send_json({"type": "error", "message": "WebSocket handler error"}) + await websocket.send_json( + {"type": "error", "message": "WebSocket handler error"} + ) except Exception: pass finally: # Clean up resources logger.info("Cleaning up WebSocket connection resources") - + if stream_manager: await stream_manager.close() if forward_task and not forward_task.done(): @@ -414,12 +462,12 @@ async def websocket_endpoint(websocket: WebSocket): await forward_task except asyncio.CancelledError: pass - + try: await websocket.close() except Exception as e: logger.error(f"Error closing websocket: {e}") - + logger.info("Connection closed") @@ -430,71 +478,77 @@ def split_large_event(response, max_size=16000): Returns a list of events to send. """ event = json.dumps(response) - event_size = len(event.encode('utf-8')) - + event_size = len(event.encode("utf-8")) + # If event is small enough, return as-is if event_size <= max_size: return [response] - + # Get event type and data - if 'event' not in response: + if "event" not in response: return [response] - - event_type = list(response['event'].keys())[0] - event_data = response['event'][event_type] - + + event_type = list(response["event"].keys())[0] + event_data = response["event"][event_type] + # Only split events that have a 'content' field (audioOutput, textOutput, etc.) - if 'content' not in event_data: - logger.warning(f"Event {event_type} is large ({event_size} bytes) but has no content field to split") + if "content" not in event_data: + logger.warning( + f"Event {event_type} is large ({event_size} bytes) but has no content field to split" + ) return [response] - - content = event_data['content'] - + + content = event_data["content"] + # Calculate how much content we can fit per chunk # Create a template event to measure overhead template_event = response.copy() - template_event['event'] = {event_type: event_data.copy()} - template_event['event'][event_type]['content'] = '' - overhead = len(json.dumps(template_event).encode('utf-8')) - + template_event["event"] = {event_type: event_data.copy()} + template_event["event"][event_type]["content"] = "" + overhead = len(json.dumps(template_event).encode("utf-8")) + # Calculate max content size per chunk (leave some margin) max_content_size = max_size - overhead - 100 - + # For audio events, align to sample boundaries # Base64 encoding: 4 chars = 3 bytes of binary data # PCM 16-bit: 2 bytes per sample # Must align to multiples of 4 chars for valid base64 (no padding issues) - if event_type == 'audioOutput': + if event_type == "audioOutput": # Align to 4-char boundaries for complete base64 groups # This ensures each chunk is valid base64 without padding issues alignment = 4 max_content_size = (max_content_size // alignment) * alignment - logger.debug(f"Audio splitting: aligned chunk size to {max_content_size} chars (base64 boundary)") - + logger.debug( + f"Audio splitting: aligned chunk size to {max_content_size} chars (base64 boundary)" + ) + # Split content into chunks chunks = [] for i in range(0, len(content), max_content_size): - chunk_content = content[i:i + max_content_size] - + chunk_content = content[i : i + max_content_size] + # For base64 content, ensure proper padding if needed - if event_type == 'audioOutput': + if event_type == "audioOutput": # Each chunk should be a multiple of 4 chars (already aligned above) # But verify and add padding if somehow needed remainder = len(chunk_content) % 4 if remainder != 0: # This shouldn't happen due to alignment, but just in case padding_needed = 4 - remainder - chunk_content += '=' * padding_needed + chunk_content += "=" * padding_needed logger.warning(f"Added {padding_needed} padding chars to audio chunk") - + # Create new event with chunked content chunk_event = response.copy() - chunk_event['event'] = {event_type: event_data.copy()} - chunk_event['event'][event_type]['content'] = chunk_content - + chunk_event["event"] = {event_type: event_data.copy()} + chunk_event["event"][event_type]["content"] = chunk_content + chunks.append(chunk_event) - - logger.info(f"Split {event_type} event ({event_size} bytes) into {len(chunks)} chunks") + + logger.info( + f"Split {event_type} event ({event_size} bytes) into {len(chunks)} chunks" + ) return chunks @@ -504,40 +558,50 @@ async def forward_responses(websocket: WebSocket, stream_manager): while True: # Get next response from the output queue response = await stream_manager.output_queue.get() - + # Send to WebSocket try: # Check if event needs to be split event = json.dumps(response) - event_size = len(event.encode('utf-8')) - + event_size = len(event.encode("utf-8")) + # Get event type for logging - event_type = list(response.get('event', {}).keys())[0] if 'event' in response else 'unknown' - + event_type = ( + list(response.get("event", {}).keys())[0] + if "event" in response + else "unknown" + ) + # Split large events if event_size > 10000: - logger.warning(f"!!!! Large {event_type} event detected (size: {event_size} bytes) - splitting...") + logger.warning( + f"!!!! Large {event_type} event detected (size: {event_size} bytes) - splitting..." + ) events_to_send = split_large_event(response, max_size=10000) else: events_to_send = [response] - + # Send all chunks for idx, event_chunk in enumerate(events_to_send): chunk_json = json.dumps(event_chunk) - chunk_size = len(chunk_json.encode('utf-8')) - + chunk_size = len(chunk_json.encode("utf-8")) + await websocket.send_text(chunk_json) - + if len(events_to_send) > 1: - logger.info(f"Forwarded {event_type} chunk {idx+1}/{len(events_to_send)} to client (size: {chunk_size} bytes)") + logger.info( + f"Forwarded {event_type} chunk {idx+1}/{len(events_to_send)} to client (size: {chunk_size} bytes)" + ) else: - logger.info(f"Forwarded {event_type} to client (size: {chunk_size} bytes)") - + logger.info( + f"Forwarded {event_type} to client (size: {chunk_size} bytes)" + ) + except Exception as e: logger.error(f"Error sending response to client: {e}", exc_info=True) # Check if it's a connection error that should break the loop error_str = str(e).lower() - if 'closed' in error_str or 'disconnect' in error_str: + if "closed" in error_str or "disconnect" in error_str: logger.info("WebSocket connection closed, stopping forward task") break # For other errors, log but continue trying @@ -552,20 +616,20 @@ async def forward_responses(websocket: WebSocket, stream_manager): if __name__ == "__main__": import argparse - - parser = argparse.ArgumentParser(description='Nova Sonic S2S WebSocket Server') - parser.add_argument('--debug', action='store_true', help='Enable debug mode') + + parser = argparse.ArgumentParser(description="Nova Sonic S2S WebSocket Server") + parser.add_argument("--debug", action="store_true", help="Enable debug mode") args = parser.parse_args() - + if args.debug: DEBUG = True logging.getLogger().setLevel(logging.DEBUG) - + host = os.getenv("HOST", "0.0.0.0") port = int(os.getenv("PORT", "8081")) - + logger.info(f"Starting Nova Sonic S2S WebSocket Server on {host}:{port}") - + try: uvicorn.run(app, host=host, port=port) except KeyboardInterrupt: @@ -574,4 +638,5 @@ async def forward_responses(websocket: WebSocket, stream_manager): logger.error(f"Server error: {e}") if args.debug: import traceback - traceback.print_exc() \ No newline at end of file + + traceback.print_exc() From e5ce071eb6898521dc9dbbb6fddda659f20dcae8 Mon Sep 17 00:00:00 2001 From: Lana Zhang Date: Tue, 9 Dec 2025 23:12:22 -0500 Subject: [PATCH 04/23] update IMDS comments --- .../06-bi-directional-streaming/sonic/websocket/server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py index 072a4be2c..471d0900b 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py @@ -43,7 +43,7 @@ def get_imdsv2_token(): def get_credentials_from_imds(): """ - Manually retrieve IAM role credentials from EC2 Instance Metadata Service. + Manually retrieve IAM role credentials from envrionment Metadata Service. This utility method fetches credentials directly from IMDS without using boto3. It tries both IMDSv1 and IMDSv2 methods. @@ -205,7 +205,7 @@ async def startup_event(): logger.info(" Credential refresh task will not be started") else: # Try to fetch credentials from IMDS and start refresh task - logger.info("🔄 Attempting to fetch credentials from EC2 IMDS...") + logger.info("🔄 Attempting to fetch credentials from ENV IMDS...") imds_result = get_credentials_from_imds() @@ -272,7 +272,7 @@ async def credential_info(): mode = "local" note = "Using static credentials from environment variables" else: - credential_source = "EC2 IMDS (IMDSv2 preferred, falls back to IMDSv1)" + credential_source = "ENV IMDS (IMDSv2 preferred, falls back to IMDSv1)" mode = "ec2" note = "Credentials are automatically refreshed from IMDS by background task" From 849c0ebe6cebdc19499179ac0ea85b4f23b15bcc Mon Sep 17 00:00:00 2001 From: Lana Zhang Date: Tue, 9 Dec 2025 23:17:56 -0500 Subject: [PATCH 05/23] reformat the python file using ruff --- .../06-bi-directional-streaming/sonic/websocket/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py index 471d0900b..5ae550e46 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py @@ -590,7 +590,7 @@ async def forward_responses(websocket: WebSocket, stream_manager): if len(events_to_send) > 1: logger.info( - f"Forwarded {event_type} chunk {idx+1}/{len(events_to_send)} to client (size: {chunk_size} bytes)" + f"Forwarded {event_type} chunk {idx + 1}/{len(events_to_send)} to client (size: {chunk_size} bytes)" ) else: logger.info( From 52a460ff0579e045381948ac1d7a15ce73940683 Mon Sep 17 00:00:00 2001 From: Lana Zhang Date: Tue, 9 Dec 2025 23:38:43 -0500 Subject: [PATCH 06/23] sonic sample update to use default port 8080 --- .../06-bi-directional-streaming/sonic/websocket/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py index 5ae550e46..cc9e510e1 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/websocket/server.py @@ -626,7 +626,7 @@ async def forward_responses(websocket: WebSocket, stream_manager): logging.getLogger().setLevel(logging.DEBUG) host = os.getenv("HOST", "0.0.0.0") - port = int(os.getenv("PORT", "8081")) + port = int(os.getenv("PORT", "8080")) logger.info(f"Starting Nova Sonic S2S WebSocket Server on {host}:{port}") From aa38e437338117c38f07e59470ecf63be392a77b Mon Sep 17 00:00:00 2001 From: Lana Zhang Date: Fri, 6 Feb 2026 17:40:53 -0500 Subject: [PATCH 07/23] agentcore runtime bidi streaming update to sonic2 with text input update --- .../06-bi-directional-streaming/README.md | 13 + .../sonic/client/sonic-client.html | 8 +- .../sonic/websocket/requirements.txt | 2 +- .../sonic/websocket/s2s_events.py | 321 +++++++++--------- .../sonic/websocket/s2s_session_manager.py | 257 ++++++++------ .../strands/client/strands-client.html | 80 ++++- .../strands/websocket/requirements.txt | 6 +- .../strands/websocket/server.py | 23 +- 8 files changed, 435 insertions(+), 275 deletions(-) diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md index f64af34db..7124fbbc1 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/README.md @@ -23,6 +23,8 @@ All samples use a unified setup and cleanup process through the root `setup.sh` This sample deploys a **native Amazon Nova Sonic 2 Python WebSocket server** directly to AgentCore. It provides full control over the Nova Sonic protocol with direct event handling, giving you complete visibility into session management, audio streaming, and response generation. +**Model:** Uses `amazon.nova-2-sonic-v1:0` for real-time speech-to-speech conversations. + **Architecture:** ![AgentCore Sonic Architecture](./images/agentcore-sonic-architecture.png) @@ -88,6 +90,7 @@ The web client will: ### Features - **Real-time audio streaming** - Speak naturally and get immediate responses +- **Text input** - Type messages in addition to voice input - **Voice selection** - Choose from multiple voices across different languages (English, French, Italian, German, Spanish) - **Dynamic voice switching** - Change voices during an active conversation - **Interruption support** - Barge-in capability to interrupt the assistant mid-response @@ -117,6 +120,8 @@ The Sonic implementation includes a working example of tool integration. The `ge This sample demonstrates using the **Strands BidiAgent framework** for real-time audio conversations with Amazon Nova Sonic. Strands provides a high-level abstraction that simplifies bidirectional streaming, automatic session management, and tool integration. +**Model:** Uses `amazon.nova-2-sonic-v1:0` through the Strands BidiAgent framework. + **Architecture:** The Strands implementation uses the BidiAgent framework to handle the complexity of WebSocket communication, audio streaming, and tool orchestration automatically. @@ -185,6 +190,14 @@ The Strands implementation includes a calculator tool that demonstrates framewor **Try it:** Ask questions like "What is 25 times 4?" or "Calculate 100 divided by 5" and the assistant will use the calculator tool. +### Text Input Support + +Both Sonic and Strands samples support text input in addition to voice: +- **Sonic:** Uses Nova Sonic's native text content events (`contentStart`, `textInput`, `contentEnd`) +- **Strands:** Uses `agent.send()` method to send text messages directly to the agent + +Type your message in the text input field and press Enter or click Send. + ### Key Differences from Sonic Sample - **Abstraction level:** Strands provides higher-level APIs vs. Sonic's direct protocol control diff --git a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/client/sonic-client.html b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/client/sonic-client.html index f03f4c5bd..17daad9fc 100644 --- a/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/client/sonic-client.html +++ b/01-tutorials/01-AgentCore-runtime/06-bi-directional-streaming/sonic/client/sonic-client.html @@ -448,10 +448,10 @@

🎙️ Nova Sonic S2S WebSocket Client - with Bedrock AgentCore

- - + +
@@ -497,7 +497,7 @@

💬 Conversation

-
+