From 28e9eaa373c5b42b0e3106857452d15333937c41 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Sep 2025 09:20:09 +0000 Subject: [PATCH 1/3] Initial plan From 194afd8e37675e073b433d79081358b23ac7c32a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Sep 2025 09:40:25 +0000 Subject: [PATCH 2/3] Implement complete real-time agent reference design with core functionality Co-authored-by: marvinbuss <34542414+marvinbuss@users.noreply.github.com> --- .gitignore | 95 +++++ CONTRIBUTING.md | 75 ++++ README.md | 270 +++++++++++++- config/.env.template | 26 ++ docs/architecture/README.md | 306 +++++++++++++++ docs/setup/README.md | 353 ++++++++++++++++++ examples/complete_workflow_demo.py | 348 +++++++++++++++++ pyproject.toml | 65 ++++ requirements.txt | 18 + src/real_time_agent/__init__.py | 34 ++ src/real_time_agent/core/__init__.py | 1 + src/real_time_agent/core/agent.py | 219 +++++++++++ src/real_time_agent/core/config.py | 101 +++++ src/real_time_agent/core/interfaces.py | 130 +++++++ .../examples/advanced_streaming.py | 195 ++++++++++ .../examples/ai_foundry_example.py | 80 ++++ src/real_time_agent/examples/basic_usage.py | 83 ++++ src/real_time_agent/integrations/__init__.py | 1 + .../integrations/ai_foundry.py | 213 +++++++++++ .../integrations/azure_openai.py | 226 +++++++++++ src/real_time_agent/utils/__init__.py | 1 + src/real_time_agent/utils/helpers.py | 249 ++++++++++++ tests/test_basic.py | 333 +++++++++++++++++ 23 files changed, 3420 insertions(+), 2 deletions(-) create mode 100644 .gitignore create mode 100644 CONTRIBUTING.md create mode 100644 config/.env.template create mode 100644 docs/architecture/README.md create mode 100644 docs/setup/README.md create mode 100644 examples/complete_workflow_demo.py create mode 100644 pyproject.toml create mode 100644 requirements.txt create mode 100644 src/real_time_agent/__init__.py create mode 100644 src/real_time_agent/core/__init__.py create mode 100644 src/real_time_agent/core/agent.py create mode 100644 src/real_time_agent/core/config.py create mode 100644 src/real_time_agent/core/interfaces.py create mode 100644 src/real_time_agent/examples/advanced_streaming.py create mode 100644 src/real_time_agent/examples/ai_foundry_example.py create mode 100644 src/real_time_agent/examples/basic_usage.py create mode 100644 src/real_time_agent/integrations/__init__.py create mode 100644 src/real_time_agent/integrations/ai_foundry.py create mode 100644 src/real_time_agent/integrations/azure_openai.py create mode 100644 src/real_time_agent/utils/__init__.py create mode 100644 src/real_time_agent/utils/helpers.py create mode 100644 tests/test_basic.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d388ed6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,95 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Virtual environments +venv/ +env/ +ENV/ +env.bak/ +venv.bak/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Environment variables +.env +.env.local +.env.production +.env.test + +# OS +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +# Logs +*.log +logs/ + +# Testing +.coverage +.pytest_cache/ +.tox/ +.nox/ +coverage.xml +*.cover +*.py,cover +.hypothesis/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# pipenv +Pipfile.lock + +# PEP 582 +__pypackages__/ + +# Celery +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Temporary files +/tmp/ +temp/ +*.tmp +*.temp + +# Documentation builds +docs/_build/ +site/ \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..d99d3e6 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,75 @@ +# Contributing to Real-Time Agent + +We welcome contributions to the Real-Time Agent project! This document provides guidelines for contributing. + +## Development Setup + +1. Fork the repository +2. Clone your fork: + ```bash + git clone https://github.com/your-username/real-time-agent.git + cd real-time-agent + ``` + +3. Create a virtual environment: + ```bash + python -m venv venv + source venv/bin/activate # On Windows: venv\Scripts\activate + ``` + +4. Install dependencies: + ```bash + pip install -r requirements.txt + pip install -e . + ``` + +## Making Changes + +1. Create a feature branch: + ```bash + git checkout -b feature/your-feature-name + ``` + +2. Make your changes following the code style guidelines +3. Add tests for new functionality +4. Run tests to ensure everything works +5. Update documentation as needed + +## Code Style + +- Follow PEP 8 for Python code style +- Use type hints for all function parameters and return values +- Add docstrings for all public functions and classes +- Keep line length to 100 characters maximum + +## Testing + +Run tests before submitting: + +```bash +# Run basic tests +python tests/test_basic.py + +# With pytest (if available) +pytest tests/ +``` + +## Submitting Changes + +1. Commit your changes with descriptive messages +2. Push to your fork +3. Create a pull request with: + - Clear description of changes + - Reference to any related issues + - Screenshots for UI changes + +## Issues + +When reporting issues, please include: +- Python version +- Operating system +- Error messages and stack traces +- Steps to reproduce +- Expected vs actual behavior + +Thank you for contributing! \ No newline at end of file diff --git a/README.md b/README.md index 6ee67f7..d422926 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,268 @@ -# real-time-agent -This repository will include a reference design for real time agents build on Azure Open AI or AI Foundry. +# Real-Time Agent Reference Implementation + +A comprehensive reference design and Python implementation for building real-time AI agents using Azure OpenAI or AI Foundry. This repository provides architecture patterns, best practices, and production-ready code for implementing conversational AI agents with real-time streaming capabilities. + +## 🎯 Overview + +This project demonstrates how to build scalable, real-time AI agents that can: +- Handle streaming responses for immediate user feedback +- Support multiple AI providers (Azure OpenAI and AI Foundry) +- Implement robust error handling and reconnection logic +- Provide comprehensive event handling and logging +- Scale for production use cases + +## ✨ Features + +- **Multi-Provider Support**: Seamlessly switch between Azure OpenAI and AI Foundry +- **Real-Time Streaming**: Get immediate response chunks as they're generated +- **Event-Driven Architecture**: Comprehensive event handling for messages, connections, and errors +- **Configuration Management**: Flexible configuration via environment variables or code +- **Production Ready**: Includes logging, error handling, retry logic, and validation +- **Extensible Design**: Easy to extend with new providers or custom functionality +- **Type Safety**: Full type annotations for better development experience + +## πŸš€ Quick Start + +### Installation + +```bash +# Clone the repository +git clone https://github.com/PerfectThymeTech/real-time-agent.git +cd real-time-agent + +# Install dependencies +pip install -r requirements.txt + +# Or install in development mode +pip install -e . +``` + +### Configuration + +1. Copy the configuration template: + ```bash + cp config/.env.template .env + ``` + +2. Update the `.env` file with your credentials: + ```env + # For Azure OpenAI + AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/ + AZURE_OPENAI_API_KEY=your-api-key-here + AZURE_OPENAI_DEPLOYMENT=gpt-4 + + # For AI Foundry + AI_FOUNDRY_ENDPOINT=https://your-foundry-endpoint.com/ + AI_FOUNDRY_API_KEY=your-foundry-api-key-here + ``` + +### Basic Usage + +```python +import asyncio +from real_time_agent import RealTimeAgent, AgentConfig, AzureOpenAIProvider + +async def main(): + # Create configuration + config = AgentConfig( + provider="azure_openai", + azure_openai_endpoint="https://your-resource.openai.azure.com/", + azure_openai_deployment="gpt-4", + streaming=True + ) + + # Create provider and agent + provider = AzureOpenAIProvider(config) + agent = RealTimeAgent(config, provider) + + # Initialize and use + await agent.initialize() + + # Send a message and get streaming response + async for response in await agent.send_message("Hello, how can you help me?"): + print(response.message.content, end='', flush=True) + if response.is_final: + break + + await agent.close() + +asyncio.run(main()) +``` + +## πŸ“– Documentation + +### Architecture Overview + +The real-time agent system follows a modular, event-driven architecture: + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Application │───▢│ RealTimeAgent │───▢│ AI Provider β”‚ +β”‚ Layer β”‚ β”‚ β”‚ β”‚ (Azure/Foundry) β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ Event Handler β”‚ + β”‚ (Optional) β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Key Components + +1. **RealTimeAgent**: Main orchestrator handling conversation flow +2. **AI Providers**: Pluggable providers for different AI services +3. **Configuration**: Centralized configuration management +4. **Event Handlers**: Optional callbacks for real-time events +5. **Utilities**: Helper functions for common tasks + +### Provider Comparison + +| Feature | Azure OpenAI | AI Foundry | +|---------|--------------|------------| +| Streaming | βœ… | βœ… | +| Authentication | API Key / Managed Identity | API Key / Managed Identity | +| Model Selection | Deployment-based | Model-based | +| Enterprise Features | βœ… | βœ… | +| Cost | Pay-per-token | Varies | + +## πŸ“ Project Structure + +``` +src/real_time_agent/ +β”œβ”€β”€ core/ +β”‚ β”œβ”€β”€ agent.py # Main agent implementation +β”‚ β”œβ”€β”€ config.py # Configuration management +β”‚ └── interfaces.py # Abstract base classes +β”œβ”€β”€ integrations/ +β”‚ β”œβ”€β”€ azure_openai.py # Azure OpenAI provider +β”‚ └── ai_foundry.py # AI Foundry provider +β”œβ”€β”€ utils/ +β”‚ └── helpers.py # Utility functions +└── examples/ + β”œβ”€β”€ basic_usage.py # Basic example + β”œβ”€β”€ advanced_streaming.py # Advanced streaming example + └── ai_foundry_example.py # AI Foundry example +``` + +## πŸ”§ Configuration Options + +### Environment Variables + +All configuration can be set via environment variables with the `REAL_TIME_AGENT_` prefix: + +- `REAL_TIME_AGENT_PROVIDER`: AI provider to use (`azure_openai` or `ai_foundry`) +- `REAL_TIME_AGENT_MAX_TOKENS`: Maximum tokens per response +- `REAL_TIME_AGENT_TEMPERATURE`: Response randomness (0.0-1.0) +- `REAL_TIME_AGENT_STREAMING`: Enable streaming responses +- `REAL_TIME_AGENT_LOG_LEVEL`: Logging level + +### Programmatic Configuration + +```python +config = AgentConfig( + provider="azure_openai", + max_tokens=1000, + temperature=0.7, + streaming=True, + system_prompt="Your custom system prompt", + # Azure OpenAI specific + azure_openai_endpoint="https://your-resource.openai.azure.com/", + azure_openai_deployment="gpt-4", + # AI Foundry specific + ai_foundry_endpoint="https://your-foundry-endpoint.com/" +) +``` + +## πŸŽ›οΈ Advanced Usage + +### Custom Event Handling + +```python +from real_time_agent.core.interfaces import EventHandler + +class CustomEventHandler: + async def on_message(self, message): + # Custom message handling logic + print(f"Received: {message.content}") + + async def on_connection_status(self, status): + # Handle connection changes + print(f"Status: {status}") + + async def on_error(self, error): + # Custom error handling + print(f"Error: {error}") + +# Use with agent +handler = CustomEventHandler() +agent = RealTimeAgent(config, provider, handler) +``` + +### Conversation Management + +```python +# Get conversation history +history = await agent.get_conversation_history() + +# Clear conversation +await agent.clear_conversation() + +# Update system prompt +await agent.set_system_prompt("New system prompt") + +# Export conversation +from real_time_agent.utils.helpers import ConversationExporter +json_export = ConversationExporter.to_json(history) +markdown_export = ConversationExporter.to_markdown(history) +``` + +## πŸ§ͺ Examples + +Run the included examples to see the agent in action: + +```bash +# Basic usage example +python src/real_time_agent/examples/basic_usage.py + +# Advanced streaming with event handling +python src/real_time_agent/examples/advanced_streaming.py + +# AI Foundry integration +python src/real_time_agent/examples/ai_foundry_example.py +``` + +## πŸ” Security Best Practices + +1. **Use Environment Variables**: Store API keys in environment variables, not code +2. **Managed Identity**: Use Azure Managed Identity in production when possible +3. **Rate Limiting**: Implement rate limiting for user-facing applications +4. **Input Validation**: Validate and sanitize user inputs +5. **Logging**: Be careful not to log sensitive information + +## 🀝 Contributing + +Contributions are welcome! Please see our [contributing guidelines](CONTRIBUTING.md) for details. + +## πŸ“„ License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. + +## πŸ†˜ Support + +- Create an [issue](https://github.com/PerfectThymeTech/real-time-agent/issues) for bug reports or feature requests +- Check the [documentation](docs/) for detailed guides +- Review the [examples](src/real_time_agent/examples/) for implementation patterns + +## 🎯 Use Cases + +This reference implementation is ideal for: +- Customer service chatbots +- Technical support agents +- Educational AI tutors +- Content generation tools +- Interactive AI assistants +- Real-time translation services + +--- + +Built with ❀️ by Perfect Thyme Tech diff --git a/config/.env.template b/config/.env.template new file mode 100644 index 0000000..94e7ebd --- /dev/null +++ b/config/.env.template @@ -0,0 +1,26 @@ +"""Example configuration template for real-time agents.""" + +# Azure OpenAI Configuration +AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/ +AZURE_OPENAI_API_KEY=your-api-key-here +AZURE_OPENAI_DEPLOYMENT=gpt-4 +AZURE_OPENAI_API_VERSION=2024-02-01 + +# AI Foundry Configuration +AI_FOUNDRY_ENDPOINT=https://your-foundry-endpoint.com/ +AI_FOUNDRY_API_KEY=your-foundry-api-key-here + +# Agent Configuration +REAL_TIME_AGENT_PROVIDER=azure_openai +REAL_TIME_AGENT_MAX_TOKENS=1000 +REAL_TIME_AGENT_TEMPERATURE=0.7 +REAL_TIME_AGENT_STREAMING=true +REAL_TIME_AGENT_LOG_LEVEL=INFO + +# Real-time Configuration +REAL_TIME_AGENT_WEBSOCKET_TIMEOUT=30 +REAL_TIME_AGENT_HEARTBEAT_INTERVAL=10 +REAL_TIME_AGENT_MAX_RECONNECT_ATTEMPTS=5 + +# System Configuration +REAL_TIME_AGENT_SYSTEM_PROMPT=You are a helpful AI assistant providing real-time responses. \ No newline at end of file diff --git a/docs/architecture/README.md b/docs/architecture/README.md new file mode 100644 index 0000000..9ce55cc --- /dev/null +++ b/docs/architecture/README.md @@ -0,0 +1,306 @@ +# Real-Time Agent Architecture + +This document outlines the architecture concepts and design patterns for building real-time AI agents using Azure OpenAI or AI Foundry. + +## Overview + +Real-time agents enable immediate, conversational interactions between users and AI systems. Unlike traditional batch processing, real-time agents provide streaming responses that appear as the AI generates them, creating a more natural and engaging user experience. + +## Core Architecture Principles + +### 1. Event-Driven Design + +The architecture follows an event-driven pattern where components communicate through well-defined events: + +``` +User Input β†’ Agent β†’ Provider β†’ Streaming Response β†’ Event Handler β†’ UI Update +``` + +This allows for: +- Loose coupling between components +- Real-time feedback to users +- Scalable event processing +- Easy integration of new providers + +### 2. Provider Abstraction + +The system uses a provider pattern to support multiple AI services: + +```python +class AIProvider(ABC): + @abstractmethod + async def generate_response(self, messages, stream=True): + pass +``` + +Benefits: +- Easy switching between providers +- Consistent interface across different AI services +- Future-proof design for new providers + +### 3. Configuration Management + +Centralized configuration supports multiple deployment scenarios: + +- Development: Local configuration files +- Staging/Production: Environment variables +- Enterprise: Azure Key Vault integration + +## Component Architecture + +### Core Components + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Application Layer β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ RealTimeAgent β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Config β”‚ β”‚ Session β”‚ β”‚ Event Handler β”‚ β”‚ +β”‚ β”‚ Management β”‚ β”‚ Management β”‚ β”‚ β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ Provider Layer β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Azure OpenAI β”‚ β”‚ AI Foundry β”‚ β”‚ +β”‚ β”‚ Provider β”‚ β”‚ Provider β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ Utility Layer β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Logging β”‚ β”‚ Validation β”‚ β”‚ Export/Import β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### Data Flow + +1. **Input Processing**: User messages are validated and formatted +2. **Context Management**: Conversation history is maintained and context is prepared +3. **Provider Selection**: Appropriate AI provider is selected based on configuration +4. **Request Processing**: Request is sent to the AI provider with streaming enabled +5. **Response Handling**: Streaming chunks are processed and events are emitted +6. **Output Generation**: Responses are formatted and delivered to the user + +## Real-Time Streaming Architecture + +### Streaming Response Pattern + +```python +async def handle_streaming_response(): + async for chunk in provider.generate_response(messages, stream=True): + # Process each chunk immediately + await event_handler.on_message(chunk.message) + + # Update UI in real-time + yield chunk + + if chunk.is_final: + # Add to conversation history + conversation.append(chunk.message) +``` + +### Benefits of Streaming + +1. **Improved User Experience**: Users see responses immediately +2. **Reduced Perceived Latency**: Responses feel faster even if total time is the same +3. **Better Engagement**: Users stay engaged during longer responses +4. **Progressive Disclosure**: Information is revealed progressively + +## Provider Integration Patterns + +### Azure OpenAI Integration + +```python +class AzureOpenAIProvider: + def __init__(self, config): + self.client = AsyncAzureOpenAI( + api_key=config.api_key, + api_version=config.api_version, + azure_endpoint=config.endpoint + ) + + async def generate_response(self, messages, stream=True): + response = await self.client.chat.completions.create( + model=self.deployment, + messages=self.format_messages(messages), + stream=stream + ) + + if stream: + async for chunk in response: + yield self.process_chunk(chunk) + else: + yield self.process_response(response) +``` + +### AI Foundry Integration + +```python +class AIFoundryProvider: + def __init__(self, config): + self.client = ChatCompletionsClient( + endpoint=config.endpoint, + credential=self.get_credential(config) + ) + + async def generate_response(self, messages, stream=True): + response = await self.client.complete( + messages=self.format_messages(messages), + stream=stream + ) + + # Similar streaming pattern as Azure OpenAI +``` + +## Event-Driven Communication + +### Event Types + +1. **Message Events**: New messages in the conversation +2. **Connection Events**: Provider connection status changes +3. **Error Events**: Errors and exceptions during processing +4. **Status Events**: Agent status and health information + +### Event Handler Pattern + +```python +class EventHandler: + async def on_message(self, message: Message): + # Handle new messages + pass + + async def on_connection_status(self, status: ConnectionStatus): + # Handle connection changes + pass + + async def on_error(self, error: Exception): + # Handle errors + pass +``` + +## Scalability Considerations + +### Horizontal Scaling + +- **Stateless Design**: Agents are stateless and can be scaled horizontally +- **Provider Load Balancing**: Multiple provider instances can be load balanced +- **Session Management**: External session storage for multi-instance deployments + +### Performance Optimization + +- **Connection Pooling**: Reuse connections to AI providers +- **Caching**: Cache frequently used responses or context +- **Async Processing**: Non-blocking I/O for all operations + +### Resource Management + +- **Token Limits**: Respect provider token limits and quotas +- **Rate Limiting**: Implement rate limiting to prevent abuse +- **Memory Management**: Efficient handling of conversation history + +## Security Architecture + +### Authentication & Authorization + +```python +# Multiple authentication methods +if config.api_key: + # Direct API key authentication + credential = config.api_key +else: + # Managed identity authentication + credential = DefaultAzureCredential() +``` + +### Data Protection + +1. **Encryption in Transit**: All API calls use HTTPS/TLS +2. **Sensitive Data Handling**: API keys stored securely +3. **Audit Logging**: Comprehensive logging for compliance +4. **Input Validation**: All inputs validated and sanitized + +## Deployment Patterns + +### Development Deployment + +```yaml +Environment: Development +Configuration: Local files (.env) +Authentication: API keys +Logging: Console output +Scaling: Single instance +``` + +### Production Deployment + +```yaml +Environment: Production +Configuration: Environment variables / Key Vault +Authentication: Managed Identity +Logging: Azure Monitor / Application Insights +Scaling: Auto-scaling groups +Load Balancing: Azure Load Balancer +``` + +## Monitoring and Observability + +### Key Metrics + +1. **Response Time**: Time to first token and total response time +2. **Throughput**: Messages processed per second +3. **Error Rate**: Failed requests and error types +4. **Token Usage**: Token consumption patterns +5. **Connection Health**: Provider connectivity status + +### Logging Strategy + +```python +# Structured logging +logger.info("Message processed", extra={ + "message_id": message.id, + "user_id": user.id, + "provider": "azure_openai", + "tokens_used": response.usage.total_tokens, + "response_time_ms": response_time +}) +``` + +## Best Practices + +### Code Organization + +1. **Separation of Concerns**: Clear boundaries between components +2. **Dependency Injection**: Providers and handlers are injected +3. **Type Safety**: Full type annotations for better development experience +4. **Error Handling**: Comprehensive error handling and recovery + +### Configuration Management + +1. **Environment-Based**: Different configs for different environments +2. **Validation**: Configuration validation at startup +3. **Secrets Management**: Secure handling of sensitive configuration + +### Testing Strategy + +1. **Unit Tests**: Test individual components in isolation +2. **Integration Tests**: Test provider integrations +3. **End-to-End Tests**: Test complete conversation flows +4. **Performance Tests**: Test under load conditions + +## Future Considerations + +### Extensibility + +- **Plugin Architecture**: Support for custom providers and handlers +- **Middleware Pattern**: Extensible request/response processing +- **Custom Protocols**: Support for different communication protocols + +### Advanced Features + +- **Multi-Modal Support**: Text, voice, and image processing +- **Context Awareness**: Enhanced context understanding +- **Personalization**: User-specific customization +- **Multi-Language**: International language support + +This architecture provides a solid foundation for building production-ready real-time AI agents while maintaining flexibility for future enhancements and integrations. \ No newline at end of file diff --git a/docs/setup/README.md b/docs/setup/README.md new file mode 100644 index 0000000..0c9e7e0 --- /dev/null +++ b/docs/setup/README.md @@ -0,0 +1,353 @@ +# Setup and Installation Guide + +This guide provides detailed instructions for setting up and configuring the real-time agent system. + +## Prerequisites + +### System Requirements + +- **Python**: 3.8 or higher +- **Operating System**: Windows, macOS, or Linux +- **Memory**: Minimum 4GB RAM (8GB recommended) +- **Storage**: 1GB free space + +### Azure Requirements + +Choose one of the following Azure AI services: + +#### Azure OpenAI +- Azure subscription with access to Azure OpenAI Service +- Deployed GPT model (GPT-3.5 or GPT-4 recommended) +- API endpoint and access key + +#### AI Foundry +- Azure subscription with access to AI Foundry +- Configured AI Foundry workspace +- Model deployment and endpoint access + +## Installation Methods + +### Method 1: Git Clone (Recommended for Development) + +```bash +# Clone the repository +git clone https://github.com/PerfectThymeTech/real-time-agent.git +cd real-time-agent + +# Create virtual environment +python -m venv venv + +# Activate virtual environment +# On Windows: +venv\Scripts\activate +# On macOS/Linux: +source venv/bin/activate + +# Install dependencies +pip install -r requirements.txt + +# Install in development mode +pip install -e . +``` + +### Method 2: Direct Installation (Future) + +```bash +# When published to PyPI +pip install real-time-agent +``` + +## Configuration + +### Step 1: Environment Setup + +1. Copy the configuration template: + ```bash + cp config/.env.template .env + ``` + +2. Edit the `.env` file with your specific configuration: + ```bash + nano .env # or use your preferred editor + ``` + +### Step 2: Azure OpenAI Configuration + +If using Azure OpenAI, configure these settings: + +```env +# Azure OpenAI Configuration +AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/ +AZURE_OPENAI_API_KEY=your-api-key-here +AZURE_OPENAI_DEPLOYMENT=gpt-4 +AZURE_OPENAI_API_VERSION=2024-02-01 + +# Set provider +REAL_TIME_AGENT_PROVIDER=azure_openai +``` + +#### Getting Azure OpenAI Credentials + +1. **Create Azure OpenAI Resource**: + - Go to [Azure Portal](https://portal.azure.com) + - Create a new Azure OpenAI resource + - Note the endpoint URL and region + +2. **Deploy a Model**: + - In the Azure OpenAI Studio, go to Deployments + - Create a new deployment with GPT-3.5 or GPT-4 + - Note the deployment name + +3. **Get API Key**: + - In the Azure OpenAI resource, go to Keys and Endpoint + - Copy either Key 1 or Key 2 + - Copy the Endpoint URL + +### Step 3: AI Foundry Configuration + +If using AI Foundry, configure these settings: + +```env +# AI Foundry Configuration +AI_FOUNDRY_ENDPOINT=https://your-foundry-endpoint.com/ +AI_FOUNDRY_API_KEY=your-foundry-api-key-here + +# Set provider +REAL_TIME_AGENT_PROVIDER=ai_foundry +``` + +#### Getting AI Foundry Credentials + +1. **Access AI Foundry Workspace**: + - Go to [AI Foundry](https://ai.azure.com) + - Select or create a workspace + +2. **Deploy a Model**: + - Choose a model from the model catalog + - Deploy it to your workspace + - Note the endpoint URL + +3. **Get API Key**: + - In your workspace, go to the deployment + - Copy the API key and endpoint URL + +### Step 4: Advanced Configuration + +Additional configuration options: + +```env +# Agent Behavior +REAL_TIME_AGENT_MAX_TOKENS=1000 +REAL_TIME_AGENT_TEMPERATURE=0.7 +REAL_TIME_AGENT_STREAMING=true +REAL_TIME_AGENT_SYSTEM_PROMPT=You are a helpful AI assistant. + +# Connection Settings +REAL_TIME_AGENT_WEBSOCKET_TIMEOUT=30 +REAL_TIME_AGENT_HEARTBEAT_INTERVAL=10 +REAL_TIME_AGENT_MAX_RECONNECT_ATTEMPTS=5 + +# Logging +REAL_TIME_AGENT_LOG_LEVEL=INFO +``` + +## Authentication Methods + +### Method 1: API Key Authentication (Simplest) + +Set API keys directly in environment variables: + +```env +AZURE_OPENAI_API_KEY=your-api-key +AI_FOUNDRY_API_KEY=your-api-key +``` + +### Method 2: Managed Identity (Production Recommended) + +For production deployments, use Azure Managed Identity: + +1. **Configure Managed Identity**: + ```bash + # No API key needed in environment + # Remove or comment out API key variables + # AZURE_OPENAI_API_KEY= + ``` + +2. **Grant Permissions**: + - Assign "Cognitive Services OpenAI User" role to the managed identity + - For AI Foundry, assign appropriate workspace permissions + +3. **Code will automatically use**: + ```python + # The providers automatically detect and use managed identity + # when no API key is provided + ``` + +## Verification + +### Step 1: Test Installation + +```bash +# Test basic import +python -c "import real_time_agent; print('Installation successful!')" +``` + +### Step 2: Run Basic Example + +```bash +# Make sure your .env file is configured +python src/real_time_agent/examples/basic_usage.py +``` + +Expected output: +``` +INFO - Real-time agent initialized successfully! +INFO - User: Hello! Can you help me understand what real-time agents are? +INFO - Assistant: [Response about real-time agents] +... +``` + +### Step 3: Test Streaming + +```bash +python src/real_time_agent/examples/advanced_streaming.py +``` + +You should see real-time streaming responses. + +## Troubleshooting + +### Common Issues + +#### 1. Authentication Errors + +**Error**: `Authentication failed` or `401 Unauthorized` + +**Solutions**: +- Verify API key is correct +- Check endpoint URL format +- Ensure the resource has proper permissions +- For managed identity, verify role assignments + +#### 2. Model Not Found + +**Error**: `The model 'gpt-4' does not exist` + +**Solutions**: +- Verify deployment name in Azure OpenAI Studio +- Check that the model is properly deployed +- Ensure the deployment name matches the configuration + +#### 3. Network Connectivity + +**Error**: `Connection timeout` or `Cannot reach endpoint` + +**Solutions**: +- Verify endpoint URL is correct +- Check network connectivity +- Verify firewall settings +- Test endpoint accessibility with curl: + ```bash + curl -H "Authorization: Bearer YOUR_API_KEY" YOUR_ENDPOINT/openai/deployments/YOUR_DEPLOYMENT/chat/completions?api-version=2024-02-01 + ``` + +#### 4. Streaming Issues + +**Error**: `Streaming not supported` or broken streaming + +**Solutions**: +- Verify streaming is enabled in configuration +- Check that the model supports streaming +- Test with `streaming=False` first + +### Debug Mode + +Enable debug logging for detailed troubleshooting: + +```env +REAL_TIME_AGENT_LOG_LEVEL=DEBUG +``` + +Or in code: +```python +import logging +logging.basicConfig(level=logging.DEBUG) +``` + +### Getting Help + +If you encounter issues: + +1. Check the [troubleshooting section](#troubleshooting) +2. Review the [examples](../src/real_time_agent/examples/) +3. Check [GitHub Issues](https://github.com/PerfectThymeTech/real-time-agent/issues) +4. Create a new issue with: + - Your configuration (without API keys) + - Error messages + - Python version and OS + - Steps to reproduce + +## Production Deployment + +For production deployments, consider: + +### Security +- Use managed identity instead of API keys +- Store secrets in Azure Key Vault +- Enable audit logging +- Implement rate limiting + +### Monitoring +- Set up Application Insights +- Configure health checks +- Monitor token usage and costs +- Set up alerts for errors + +### Scaling +- Use Azure Container Instances or AKS +- Implement auto-scaling based on load +- Use Azure Load Balancer for high availability +- Consider Azure Functions for serverless deployment + +### Sample Production Configuration + +```yaml +# Azure Container Instance or Kubernetes deployment +apiVersion: apps/v1 +kind: Deployment +metadata: + name: real-time-agent +spec: + replicas: 3 + selector: + matchLabels: + app: real-time-agent + template: + metadata: + labels: + app: real-time-agent + spec: + containers: + - name: agent + image: your-registry/real-time-agent:latest + env: + - name: AZURE_OPENAI_ENDPOINT + value: "https://your-resource.openai.azure.com/" + - name: AZURE_OPENAI_DEPLOYMENT + value: "gpt-4" + - name: REAL_TIME_AGENT_LOG_LEVEL + value: "INFO" + # Use managed identity - no API key needed +``` + +## Next Steps + +After successful setup: + +1. **Explore Examples**: Run all example scripts to understand different patterns +2. **Customize Configuration**: Adjust settings for your specific use case +3. **Integration**: Integrate the agent into your application +4. **Monitoring**: Set up proper logging and monitoring +5. **Testing**: Implement tests for your specific use cases + +For detailed usage patterns, see the [Examples Documentation](../examples/). \ No newline at end of file diff --git a/examples/complete_workflow_demo.py b/examples/complete_workflow_demo.py new file mode 100644 index 0000000..8e9862e --- /dev/null +++ b/examples/complete_workflow_demo.py @@ -0,0 +1,348 @@ +""" +Complete workflow example showing all features of the real-time agent. + +This example demonstrates: +- Configuration management +- Provider initialization +- Real-time streaming +- Event handling +- Conversation management +- Error handling +- Export functionality +""" + +import asyncio +import logging +import os +from datetime import datetime +from typing import List + +# Set Python path for direct execution +import sys +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) + +from real_time_agent.core.config import AgentConfig +from real_time_agent.core.interfaces import Message, MessageType, ConnectionStatus +from real_time_agent.core.agent import RealTimeAgent +from real_time_agent.utils.helpers import LoggingEventHandler, ConversationExporter + + +class DemoEventHandler: + """Demo event handler that shows all capabilities.""" + + def __init__(self): + self.logger = logging.getLogger("DemoHandler") + self.message_count = 0 + self.token_usage = 0 + + async def on_message(self, message: Message) -> None: + """Handle messages with rich formatting.""" + self.message_count += 1 + + if message.type == MessageType.USER: + print(f"\nπŸ™‹ User [{self.message_count}]: {message.content}") + elif message.type == MessageType.ASSISTANT: + # Check if this is a streaming chunk or complete message + if message.metadata and 'accumulated_content' in message.metadata: + # This is streaming - show progress + accumulated = message.metadata['accumulated_content'] + print(f"\rπŸ€– Assistant: {accumulated}", end='', flush=True) + else: + # Complete message + print(f"\nπŸ€– Assistant [{self.message_count}]: {message.content}") + elif message.type == MessageType.SYSTEM: + print(f"\nβš™οΈ System [{self.message_count}]: {message.content}") + + async def on_connection_status(self, status: ConnectionStatus) -> None: + """Handle connection status with visual indicators.""" + status_icons = { + ConnectionStatus.CONNECTING: "🟑", + ConnectionStatus.CONNECTED: "🟒", + ConnectionStatus.DISCONNECTED: "πŸ”΄", + ConnectionStatus.ERROR: "❌", + ConnectionStatus.RECONNECTING: "πŸ”„" + } + + icon = status_icons.get(status, "❓") + print(f"\n{icon} Connection Status: {status.value.title()}") + + async def on_error(self, error: Exception) -> None: + """Handle errors with detailed information.""" + print(f"\n❌ Error: {type(error).__name__}: {error}") + + +class MockProvider: + """Mock provider for demonstration without requiring API keys.""" + + def __init__(self, config: AgentConfig): + self.config = config + self.logger = logging.getLogger("MockProvider") + + async def initialize(self) -> None: + """Simulate provider initialization.""" + await asyncio.sleep(0.5) # Simulate initialization delay + self.logger.info("Mock provider initialized") + + async def generate_response(self, messages: List[Message], stream: bool = True, **kwargs): + """Generate mock responses that simulate real AI behavior.""" + from real_time_agent.core.interfaces import AgentResponse + import uuid + + # Get the latest user message + user_messages = [m for m in messages if m.type == MessageType.USER] + latest_message = user_messages[-1].content.lower() if user_messages else "" + + # Generate contextual responses + if "hello" in latest_message or "hi" in latest_message: + response_text = ("Hello! I'm a mock AI assistant demonstrating the real-time agent " + "capabilities. I can help you understand how streaming responses work, " + "handle different types of conversations, and showcase the event-driven " + "architecture of this system.") + elif "real-time" in latest_message or "agent" in latest_message: + response_text = ("Real-time agents are AI systems that provide immediate, streaming " + "responses to user inputs. They're built on event-driven architectures " + "that enable low-latency interactions. Key features include: streaming " + "response generation, real-time event handling, connection management, " + "and scalable provider abstractions. This implementation supports both " + "Azure OpenAI and AI Foundry as backend providers.") + elif "azure" in latest_message or "openai" in latest_message: + response_text = ("Azure OpenAI is Microsoft's cloud-based implementation of OpenAI's " + "models, providing enterprise-grade security, compliance, and integration " + "with Azure services. It offers the same powerful GPT models with additional " + "features like private networking, customer-managed keys, and SLA guarantees. " + "Perfect for enterprise applications requiring both AI capabilities and " + "robust security measures.") + elif "foundry" in latest_message or "ai foundry" in latest_message: + response_text = ("AI Foundry is Microsoft's comprehensive platform for building, " + "deploying, and managing AI applications. It provides tools for model " + "development, training, fine-tuning, and deployment across various AI " + "scenarios. The platform supports both proprietary and open-source models, " + "offering flexibility in choosing the right AI solution for specific use cases.") + elif "streaming" in latest_message: + response_text = ("Streaming responses provide immediate feedback to users by sending " + "response chunks as they're generated, rather than waiting for the complete " + "response. This creates a more interactive experience, reduces perceived " + "latency, and keeps users engaged during longer responses. Implementation " + "involves async generators, event-driven architectures, and careful state " + "management to handle partial responses correctly.") + else: + response_text = (f"You asked about: '{latest_message}'. This is a mock response " + f"demonstrating how the real-time agent processes your input and generates " + f"contextual responses. In a real implementation, this would be powered by " + f"Azure OpenAI or AI Foundry models with much more sophisticated understanding " + f"and generation capabilities.") + + if stream: + return self._stream_response(response_text) + else: + return self._single_response(response_text) + + async def _stream_response(self, response_text: str): + """Generate streaming response.""" + from real_time_agent.core.interfaces import AgentResponse + import uuid + + # Simulate streaming response + message_id = str(uuid.uuid4()) + words = response_text.split() + accumulated = "" + + for i, word in enumerate(words): + accumulated += word + " " + + message = Message( + id=message_id, + type=MessageType.ASSISTANT, + content=word + " ", + timestamp=datetime.now(), + metadata={ + "accumulated_content": accumulated.strip(), + "is_streaming": True + } + ) + + is_final = (i == len(words) - 1) + + yield AgentResponse( + message=message, + is_final=is_final, + usage={"total_tokens": len(accumulated.split())} if is_final else None + ) + + # Simulate typing delay + await asyncio.sleep(0.05) + + async def _single_response(self, response_text: str): + """Generate single response.""" + from real_time_agent.core.interfaces import AgentResponse + import uuid + + message = Message( + id=str(uuid.uuid4()), + type=MessageType.ASSISTANT, + content=response_text, + timestamp=datetime.now(), + metadata={"is_streaming": False} + ) + + yield AgentResponse( + message=message, + is_final=True, + usage={"total_tokens": len(response_text.split())} + ) + + async def close(self) -> None: + """Close the mock provider.""" + self.logger.info("Mock provider closed") + + +async def demonstrate_complete_workflow(): + """Demonstrate the complete real-time agent workflow.""" + + print("πŸš€ Real-Time Agent Complete Workflow Demo") + print("=" * 50) + + # Setup logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + + # Create configuration + config = AgentConfig( + provider="mock", # Using mock provider for demo + max_tokens=500, + temperature=0.7, + streaming=True, + system_prompt=( + "You are a helpful AI assistant demonstrating real-time agent capabilities. " + "Provide informative responses about real-time agents, Azure OpenAI, and AI Foundry." + ), + log_level="INFO" + ) + + print(f"\nπŸ“‹ Configuration:") + print(f" Provider: {config.provider}") + print(f" Max Tokens: {config.max_tokens}") + print(f" Temperature: {config.temperature}") + print(f" Streaming: {config.streaming}") + + # Create components + provider = MockProvider(config) + event_handler = DemoEventHandler() + agent = RealTimeAgent(config, provider, event_handler) + + try: + # Initialize the agent + print(f"\nπŸ”§ Initializing agent...") + await agent.initialize() + + # Set system prompt + await agent.set_system_prompt(config.system_prompt) + + # Demo conversation + demo_questions = [ + "Hello! Can you explain what real-time agents are?", + "What are the benefits of using Azure OpenAI for real-time applications?", + "How does streaming work in AI responses?", + "Can you compare Azure OpenAI and AI Foundry?", + "What are some best practices for implementing real-time agents?" + ] + + print(f"\nπŸ’¬ Starting conversation with {len(demo_questions)} questions...") + + for i, question in enumerate(demo_questions, 1): + print(f"\n" + "─" * 60) + print(f"Question {i}/{len(demo_questions)}") + + # Send message and handle streaming response + if config.streaming: + print(f"\nπŸ™‹ User: {question}") + print(f"πŸ€– Assistant: ", end='', flush=True) + + async for response_chunk in await agent.send_message(question): + if response_chunk.is_final: + print() # New line after streaming completes + if response_chunk.usage: + print(f" πŸ“Š Tokens used: {response_chunk.usage.get('total_tokens', 0)}") + break + else: + response = await agent.send_message(question) + print(f"\nπŸ€– Assistant: {response.message.content}") + + # Small delay between questions + if i < len(demo_questions): + await asyncio.sleep(1) + + # Demonstrate conversation management + print(f"\n" + "=" * 60) + print("πŸ“ˆ Conversation Summary") + + history = await agent.get_conversation_history() + print(f" Total messages: {len(history)}") + + user_messages = [m for m in history if m.type == MessageType.USER] + assistant_messages = [m for m in history if m.type == MessageType.ASSISTANT] + system_messages = [m for m in history if m.type == MessageType.SYSTEM] + + print(f" User messages: {len(user_messages)}") + print(f" Assistant messages: {len(assistant_messages)}") + print(f" System messages: {len(system_messages)}") + + # Export conversation + print(f"\nπŸ’Ύ Exporting conversation...") + + # Create temp directory if it doesn't exist + import tempfile + temp_dir = tempfile.mkdtemp() + + json_export = ConversationExporter.to_json(history, pretty=True) + json_file = os.path.join(temp_dir, "demo_conversation.json") + with open(json_file, 'w') as f: + f.write(json_export) + + markdown_export = ConversationExporter.to_markdown(history) + md_file = os.path.join(temp_dir, "demo_conversation.md") + with open(md_file, 'w') as f: + f.write(markdown_export) + + print(f" JSON export: {json_file}") + print(f" Markdown export: {md_file}") + + # Demonstrate error handling + print(f"\nπŸ› οΈ Testing error handling...") + try: + # This should work fine with mock provider + await agent.send_message("Test error handling") + print(" βœ… Error handling test passed") + except Exception as e: + print(f" ❌ Error caught: {e}") + + print(f"\nπŸŽ‰ Demo completed successfully!") + print(f"πŸ“Š Final Statistics:") + print(f" Messages processed: {event_handler.message_count}") + print(f" Agent status: {agent.status.value}") + + except Exception as e: + print(f"\n❌ Demo failed: {e}") + raise + finally: + # Clean up + await agent.close() + print(f"\nπŸ”’ Agent closed and resources cleaned up") + + +async def main(): + """Main function.""" + try: + await demonstrate_complete_workflow() + except KeyboardInterrupt: + print(f"\n\n⏹️ Demo interrupted by user") + except Exception as e: + print(f"\n\nπŸ’₯ Demo failed with error: {e}") + import traceback + traceback.print_exc() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b889ab1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,65 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "real-time-agent" +version = "0.1.0" +description = "Reference design for real-time agents built on Azure OpenAI or AI Foundry" +readme = "README.md" +license = {file = "LICENSE"} +authors = [ + {name = "Perfect Thyme Tech", email = "info@perfectthyme.tech"} +] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] +requires-python = ">=3.8" +dependencies = [ + "openai>=1.0.0", + "azure-identity>=1.15.0", + "azure-ai-inference>=1.0.0", + "websockets>=12.0", + "asyncio-mqtt>=0.16.0", + "pydantic>=2.0.0", + "python-dotenv>=1.0.0", + "aiohttp>=3.9.0", + "numpy>=1.24.0", + "pandas>=2.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0.0", + "pytest-asyncio>=0.21.0", + "black>=23.0.0", + "flake8>=6.0.0", + "mypy>=1.0.0", +] + +[project.urls] +Homepage = "https://github.com/PerfectThymeTech/real-time-agent" +Repository = "https://github.com/PerfectThymeTech/real-time-agent" +Issues = "https://github.com/PerfectThymeTech/real-time-agent/issues" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.black] +line-length = 100 +target-version = ['py38'] + +[tool.mypy] +python_version = "3.8" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..cb6fcaf --- /dev/null +++ b/requirements.txt @@ -0,0 +1,18 @@ +# Core dependencies for real-time agent implementation +openai>=1.0.0 +azure-identity>=1.15.0 +azure-ai-inference>=1.0.0 +websockets>=12.0 +asyncio-mqtt>=0.16.0 +pydantic>=2.0.0 +python-dotenv>=1.0.0 +aiohttp>=3.9.0 +numpy>=1.24.0 +pandas>=2.0.0 + +# Development and testing dependencies +pytest>=7.0.0 +pytest-asyncio>=0.21.0 +black>=23.0.0 +flake8>=6.0.0 +mypy>=1.0.0 \ No newline at end of file diff --git a/src/real_time_agent/__init__.py b/src/real_time_agent/__init__.py new file mode 100644 index 0000000..2441992 --- /dev/null +++ b/src/real_time_agent/__init__.py @@ -0,0 +1,34 @@ +""" +Real-Time Agent Reference Implementation + +This package provides a reference design for building real-time agents +using Azure OpenAI or AI Foundry services. +""" + +__version__ = "0.1.0" +__author__ = "Perfect Thyme Tech" + +from .core.agent import RealTimeAgent +from .core.config import AgentConfig + +# Import providers with error handling for optional dependencies +try: + from .integrations.azure_openai import AzureOpenAIProvider +except ImportError: + AzureOpenAIProvider = None + +try: + from .integrations.ai_foundry import AIFoundryProvider +except ImportError: + AIFoundryProvider = None + +__all__ = [ + "RealTimeAgent", + "AgentConfig", +] + +# Only add providers to __all__ if they're available +if AzureOpenAIProvider: + __all__.append("AzureOpenAIProvider") +if AIFoundryProvider: + __all__.append("AIFoundryProvider") \ No newline at end of file diff --git a/src/real_time_agent/core/__init__.py b/src/real_time_agent/core/__init__.py new file mode 100644 index 0000000..8de4b95 --- /dev/null +++ b/src/real_time_agent/core/__init__.py @@ -0,0 +1 @@ +"""Core real-time agent implementations.""" \ No newline at end of file diff --git a/src/real_time_agent/core/agent.py b/src/real_time_agent/core/agent.py new file mode 100644 index 0000000..c58bd3e --- /dev/null +++ b/src/real_time_agent/core/agent.py @@ -0,0 +1,219 @@ +"""Main real-time agent implementation.""" + +import asyncio +import logging +import uuid +from datetime import datetime +from typing import Any, AsyncIterator, Dict, List, Optional, Union + +from .config import AgentConfig +from .interfaces import ( + AIProvider, + AgentResponse, + ConnectionStatus, + EventHandler, + Message, + MessageType, + RealTimeSession, +) + + +class RealTimeAgent: + """Main real-time agent implementation.""" + + def __init__( + self, + config: AgentConfig, + provider: AIProvider, + event_handler: Optional[EventHandler] = None + ): + """Initialize the real-time agent. + + Args: + config: Agent configuration + provider: AI provider instance + event_handler: Optional event handler for callbacks + """ + self.config = config + self.provider = provider + self.event_handler = event_handler + self.session: Optional[RealTimeSession] = None + self.conversation_history: List[Message] = [] + self._status = ConnectionStatus.DISCONNECTED + + # Setup logging + logging.basicConfig(level=getattr(logging, config.log_level.upper())) + self.logger = logging.getLogger(__name__) + + @property + def status(self) -> ConnectionStatus: + """Get current connection status.""" + return self._status + + async def initialize(self) -> None: + """Initialize the agent and provider.""" + self.logger.info("Initializing real-time agent") + try: + await self.provider.initialize() + self._status = ConnectionStatus.CONNECTED + self.logger.info("Agent initialization completed") + + if self.event_handler: + await self.event_handler.on_connection_status(self._status) + + except Exception as e: + self._status = ConnectionStatus.ERROR + self.logger.error(f"Failed to initialize agent: {e}") + + if self.event_handler: + await self.event_handler.on_error(e) + raise + + async def send_message( + self, + content: str, + message_type: MessageType = MessageType.USER, + metadata: Optional[Dict[str, Any]] = None + ) -> Union[AgentResponse, AsyncIterator[AgentResponse]]: + """Send a message and get response from the agent. + + Args: + content: Message content + message_type: Type of message + metadata: Optional metadata + + Returns: + Agent response or async iterator of responses for streaming + """ + if self._status != ConnectionStatus.CONNECTED: + raise RuntimeError("Agent is not connected") + + # Create user message + message = Message( + id=str(uuid.uuid4()), + type=message_type, + content=content, + timestamp=datetime.now(), + metadata=metadata + ) + + # Add to conversation history + self.conversation_history.append(message) + + # Notify event handler + if self.event_handler: + await self.event_handler.on_message(message) + + try: + # Get response from provider + response = await self.provider.generate_response( + messages=self.conversation_history, + stream=self.config.streaming, + max_tokens=self.config.max_tokens, + temperature=self.config.temperature + ) + + # Handle streaming vs non-streaming responses + if self.config.streaming: + # Return the async generator directly for streaming + return self._handle_streaming_response(response) + else: + # Handle single response + return await self._handle_single_response(response) + + except Exception as e: + self.logger.error(f"Failed to generate response: {e}") + + if self.event_handler: + await self.event_handler.on_error(e) + raise + + async def _handle_streaming_response( + self, + response_stream: AsyncIterator[AgentResponse] + ) -> AsyncIterator[AgentResponse]: + """Handle streaming responses from the provider.""" + accumulated_content = "" + message_id = str(uuid.uuid4()) + + async for chunk in response_stream: + accumulated_content += chunk.message.content + + # Update message with accumulated content + chunk.message.id = message_id + chunk.message.timestamp = datetime.now() + + # Notify event handler + if self.event_handler: + await self.event_handler.on_message(chunk.message) + + yield chunk + + # If this is the final chunk, add to conversation history + if chunk.is_final: + final_message = Message( + id=message_id, + type=MessageType.ASSISTANT, + content=accumulated_content, + timestamp=datetime.now(), + metadata=chunk.message.metadata + ) + self.conversation_history.append(final_message) + + async def _handle_single_response(self, response: AgentResponse) -> AgentResponse: + """Handle single (non-streaming) responses from the provider.""" + # Add response to conversation history + self.conversation_history.append(response.message) + + # Notify event handler + if self.event_handler: + await self.event_handler.on_message(response.message) + + return response + + async def clear_conversation(self) -> None: + """Clear the conversation history.""" + self.logger.info("Clearing conversation history") + self.conversation_history.clear() + + async def get_conversation_history(self) -> List[Message]: + """Get the current conversation history.""" + return self.conversation_history.copy() + + async def set_system_prompt(self, prompt: str) -> None: + """Set or update the system prompt.""" + self.config.system_prompt = prompt + + # Add system message to conversation if not already present + if not self.conversation_history or self.conversation_history[0].type != MessageType.SYSTEM: + system_message = Message( + id=str(uuid.uuid4()), + type=MessageType.SYSTEM, + content=prompt, + timestamp=datetime.now() + ) + self.conversation_history.insert(0, system_message) + else: + # Update existing system message + self.conversation_history[0].content = prompt + self.conversation_history[0].timestamp = datetime.now() + + async def close(self) -> None: + """Close the agent and cleanup resources.""" + self.logger.info("Closing real-time agent") + + try: + if self.provider: + await self.provider.close() + + if self.session: + await self.session.stop() + + self._status = ConnectionStatus.DISCONNECTED + + if self.event_handler: + await self.event_handler.on_connection_status(self._status) + + except Exception as e: + self.logger.error(f"Error during agent closure: {e}") + raise \ No newline at end of file diff --git a/src/real_time_agent/core/config.py b/src/real_time_agent/core/config.py new file mode 100644 index 0000000..f7a3ce4 --- /dev/null +++ b/src/real_time_agent/core/config.py @@ -0,0 +1,101 @@ +"""Configuration management for real-time agents.""" + +import os +from typing import Optional, Dict, Any, Literal +from pydantic import BaseModel, Field +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + + +class AgentConfig(BaseModel): + """Configuration for real-time agent.""" + + # Provider configuration + provider: Literal["azure_openai", "ai_foundry", "mock"] = Field( + default="azure_openai", + description="AI provider to use for the agent" + ) + + # Azure OpenAI Configuration + azure_openai_endpoint: Optional[str] = Field( + default_factory=lambda: os.getenv("AZURE_OPENAI_ENDPOINT"), + description="Azure OpenAI endpoint URL" + ) + azure_openai_api_key: Optional[str] = Field( + default_factory=lambda: os.getenv("AZURE_OPENAI_API_KEY"), + description="Azure OpenAI API key" + ) + azure_openai_api_version: str = Field( + default="2024-02-01", + description="Azure OpenAI API version" + ) + azure_openai_deployment: Optional[str] = Field( + default_factory=lambda: os.getenv("AZURE_OPENAI_DEPLOYMENT"), + description="Azure OpenAI deployment name" + ) + + # AI Foundry Configuration + ai_foundry_endpoint: Optional[str] = Field( + default_factory=lambda: os.getenv("AI_FOUNDRY_ENDPOINT"), + description="AI Foundry endpoint URL" + ) + ai_foundry_api_key: Optional[str] = Field( + default_factory=lambda: os.getenv("AI_FOUNDRY_API_KEY"), + description="AI Foundry API key" + ) + + # Agent behavior configuration + max_tokens: int = Field( + default=1000, + description="Maximum tokens for response generation" + ) + temperature: float = Field( + default=0.7, + description="Temperature for response generation" + ) + streaming: bool = Field( + default=True, + description="Enable streaming responses" + ) + + # Real-time configuration + websocket_timeout: int = Field( + default=30, + description="WebSocket connection timeout in seconds" + ) + heartbeat_interval: int = Field( + default=10, + description="Heartbeat interval in seconds" + ) + max_reconnect_attempts: int = Field( + default=5, + description="Maximum reconnection attempts" + ) + + # System configuration + system_prompt: str = Field( + default="You are a helpful AI assistant providing real-time responses.", + description="System prompt for the agent" + ) + log_level: str = Field( + default="INFO", + description="Logging level" + ) + + class Config: + env_prefix = "REAL_TIME_AGENT_" + case_sensitive = False + + +class ProviderConfig(BaseModel): + """Base configuration for AI providers.""" + + endpoint: str = Field(..., description="Provider endpoint URL") + api_key: Optional[str] = Field(None, description="API key for authentication") + timeout: int = Field(default=30, description="Request timeout in seconds") + max_retries: int = Field(default=3, description="Maximum retry attempts") + + class Config: + extra = "allow" \ No newline at end of file diff --git a/src/real_time_agent/core/interfaces.py b/src/real_time_agent/core/interfaces.py new file mode 100644 index 0000000..3b97bf4 --- /dev/null +++ b/src/real_time_agent/core/interfaces.py @@ -0,0 +1,130 @@ +"""Base interfaces and abstract classes for real-time agents.""" + +import asyncio +import logging +from abc import ABC, abstractmethod +from typing import Any, AsyncIterator, Dict, List, Optional, Protocol, Union +from dataclasses import dataclass +from datetime import datetime +from enum import Enum + + +class MessageType(Enum): + """Types of messages in real-time communication.""" + USER = "user" + ASSISTANT = "assistant" + SYSTEM = "system" + ERROR = "error" + STATUS = "status" + + +class ConnectionStatus(Enum): + """Connection status for real-time agents.""" + DISCONNECTED = "disconnected" + CONNECTING = "connecting" + CONNECTED = "connected" + ERROR = "error" + RECONNECTING = "reconnecting" + + +@dataclass +class Message: + """Represents a message in the real-time conversation.""" + + id: str + type: MessageType + content: str + timestamp: datetime + metadata: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert message to dictionary format.""" + return { + "id": self.id, + "type": self.type.value, + "content": self.content, + "timestamp": self.timestamp.isoformat(), + "metadata": self.metadata or {} + } + + +@dataclass +class AgentResponse: + """Response from the real-time agent.""" + + message: Message + is_final: bool = True + usage: Optional[Dict[str, Any]] = None + + def to_dict(self) -> Dict[str, Any]: + """Convert response to dictionary format.""" + return { + "message": self.message.to_dict(), + "is_final": self.is_final, + "usage": self.usage + } + + +class EventHandler(Protocol): + """Protocol for handling real-time events.""" + + async def on_message(self, message: Message) -> None: + """Handle incoming message.""" + ... + + async def on_connection_status(self, status: ConnectionStatus) -> None: + """Handle connection status changes.""" + ... + + async def on_error(self, error: Exception) -> None: + """Handle errors.""" + ... + + +class AIProvider(ABC): + """Abstract base class for AI providers.""" + + @abstractmethod + async def initialize(self) -> None: + """Initialize the provider.""" + pass + + @abstractmethod + async def generate_response( + self, + messages: List[Message], + stream: bool = True, + **kwargs: Any + ) -> Union[AgentResponse, AsyncIterator[AgentResponse]]: + """Generate response from the AI provider.""" + pass + + @abstractmethod + async def close(self) -> None: + """Close the provider connection.""" + pass + + +class RealTimeSession(ABC): + """Abstract base class for real-time sessions.""" + + @abstractmethod + async def start(self) -> None: + """Start the real-time session.""" + pass + + @abstractmethod + async def send_message(self, message: Message) -> None: + """Send a message in the session.""" + pass + + @abstractmethod + async def stop(self) -> None: + """Stop the real-time session.""" + pass + + @property + @abstractmethod + def status(self) -> ConnectionStatus: + """Get current connection status.""" + pass \ No newline at end of file diff --git a/src/real_time_agent/examples/advanced_streaming.py b/src/real_time_agent/examples/advanced_streaming.py new file mode 100644 index 0000000..6c1b8fc --- /dev/null +++ b/src/real_time_agent/examples/advanced_streaming.py @@ -0,0 +1,195 @@ +"""Advanced example demonstrating streaming responses and event handling.""" + +import asyncio +import logging +from typing import List +from real_time_agent import RealTimeAgent, AgentConfig, AzureOpenAIProvider +from real_time_agent.core.interfaces import Message, MessageType, ConnectionStatus +from real_time_agent.utils.helpers import ConversationExporter + + +class AdvancedEventHandler: + """Advanced event handler with custom logic.""" + + def __init__(self): + self.logger = logging.getLogger(__name__) + self.message_count = 0 + self.streaming_content = "" + + async def on_message(self, message: Message) -> None: + """Handle incoming messages with advanced logic.""" + self.message_count += 1 + + if message.type == MessageType.USER: + self.logger.info(f"[{self.message_count}] User: {message.content}") + elif message.type == MessageType.ASSISTANT: + # For streaming responses, accumulate content + if hasattr(message, 'metadata') and message.metadata: + accumulated = message.metadata.get('accumulated_content', '') + if accumulated: + # This is a streaming chunk + self.streaming_content = accumulated + print(f"\rAssistant: {accumulated}", end='', flush=True) + else: + # This is a complete message + self.logger.info(f"[{self.message_count}] Assistant: {message.content}") + elif message.type == MessageType.SYSTEM: + self.logger.info(f"[{self.message_count}] System: {message.content}") + + async def on_connection_status(self, status: ConnectionStatus) -> None: + """Handle connection status changes.""" + if status == ConnectionStatus.CONNECTED: + self.logger.info("🟒 Agent connected and ready") + elif status == ConnectionStatus.CONNECTING: + self.logger.info("🟑 Agent connecting...") + elif status == ConnectionStatus.DISCONNECTED: + self.logger.info("πŸ”΄ Agent disconnected") + elif status == ConnectionStatus.ERROR: + self.logger.error("❌ Agent connection error") + elif status == ConnectionStatus.RECONNECTING: + self.logger.info("πŸ”„ Agent reconnecting...") + + async def on_error(self, error: Exception) -> None: + """Handle errors with detailed logging.""" + self.logger.error(f"❌ Error occurred: {type(error).__name__}: {error}") + + +async def demonstrate_streaming(): + """Demonstrate streaming responses with real-time feedback.""" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + # Configuration for streaming + config = AgentConfig( + provider="azure_openai", + azure_openai_endpoint="https://your-resource.openai.azure.com/", + azure_openai_deployment="gpt-4", + max_tokens=800, + temperature=0.8, + streaming=True, + system_prompt=( + "You are an expert AI assistant specializing in real-time applications. " + "Provide detailed, technical explanations with examples." + ) + ) + + # Create advanced components + provider = AzureOpenAIProvider(config) + event_handler = AdvancedEventHandler() + agent = RealTimeAgent(config, provider, event_handler) + + try: + # Initialize agent + await agent.initialize() + + # Complex questions that benefit from streaming + questions = [ + ( + "Explain the architecture of a real-time AI agent system, " + "including the key components and data flow." + ), + ( + "What are the technical challenges in implementing streaming " + "responses for AI agents, and how can they be addressed?" + ), + ( + "Provide a detailed comparison between Azure OpenAI and " + "AI Foundry for real-time agent applications." + ) + ] + + conversation_messages: List[Message] = [] + + for i, question in enumerate(questions, 1): + logger.info(f"\\n--- Question {i} ---") + logger.info(f"User: {question}") + + print("\\nAssistant: ", end='', flush=True) + + # Process streaming response + full_response = "" + async for response_chunk in await agent.send_message(question): + if response_chunk.is_final: + full_response = response_chunk.message.metadata.get( + 'accumulated_content', + response_chunk.message.content + ) + print() # New line after streaming + break + + # Add a pause between questions + if i < len(questions): + await asyncio.sleep(2) + + # Export conversation + history = await agent.get_conversation_history() + + # Export to different formats + json_export = ConversationExporter.to_json(history) + markdown_export = ConversationExporter.to_markdown(history) + + # Save exports + with open('/tmp/conversation.json', 'w') as f: + f.write(json_export) + + with open('/tmp/conversation.md', 'w') as f: + f.write(markdown_export) + + logger.info(f"\\n--- Conversation Summary ---") + logger.info(f"Total messages: {len(history)}") + logger.info(f"Conversation exported to /tmp/conversation.json and /tmp/conversation.md") + + except Exception as e: + logger.error(f"Error in streaming demonstration: {e}") + raise + finally: + await agent.close() + + +async def demonstrate_error_handling(): + """Demonstrate error handling and recovery.""" + logger = logging.getLogger(__name__) + + # Configuration with intentional issues for testing + config = AgentConfig( + provider="azure_openai", + azure_openai_endpoint="https://invalid-endpoint.com/", # Invalid endpoint + azure_openai_deployment="invalid-model", + max_tokens=100, + temperature=0.5, + streaming=False + ) + + provider = AzureOpenAIProvider(config) + event_handler = AdvancedEventHandler() + agent = RealTimeAgent(config, provider, event_handler) + + try: + # This should fail and demonstrate error handling + await agent.initialize() + except Exception as e: + logger.info(f"Expected error caught: {type(e).__name__}: {e}") + logger.info("Error handling demonstration completed") + + +async def main(): + """Main function to run all demonstrations.""" + print("=== Real-Time Agent Advanced Example ===\\n") + + try: + print("1. Demonstrating streaming responses...") + await demonstrate_streaming() + + print("\\n2. Demonstrating error handling...") + await demonstrate_error_handling() + + except Exception as e: + print(f"Demo failed: {e}") + print("Note: Make sure to configure your Azure OpenAI credentials!") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/real_time_agent/examples/ai_foundry_example.py b/src/real_time_agent/examples/ai_foundry_example.py new file mode 100644 index 0000000..e93e3b4 --- /dev/null +++ b/src/real_time_agent/examples/ai_foundry_example.py @@ -0,0 +1,80 @@ +"""Example showing AI Foundry integration.""" + +import asyncio +import logging +from real_time_agent import RealTimeAgent, AgentConfig, AIFoundryProvider +from real_time_agent.utils.helpers import LoggingEventHandler + + +async def main(): + """Example using AI Foundry provider.""" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + try: + # Create configuration for AI Foundry + config = AgentConfig( + provider="ai_foundry", + ai_foundry_endpoint="https://your-foundry-endpoint.com/", + ai_foundry_api_key="your-foundry-api-key", # Or use environment variable + max_tokens=500, + temperature=0.7, + streaming=True, + system_prompt="You are an AI assistant powered by AI Foundry. Be helpful and informative." + ) + + # Create provider and event handler + provider = AIFoundryProvider(config) + event_handler = LoggingEventHandler(logger) + + # Create and initialize agent + agent = RealTimeAgent(config, provider, event_handler) + await agent.initialize() + + logger.info("AI Foundry agent initialized successfully!") + + # Example conversation focused on AI Foundry capabilities + messages = [ + "What is AI Foundry and how does it differ from other AI platforms?", + "Can you explain the benefits of using AI Foundry for enterprise applications?", + "What are some best practices for deploying AI agents using AI Foundry?" + ] + + for user_message in messages: + logger.info(f"User: {user_message}") + + # Send message and handle response + if config.streaming: + # Handle streaming response + response_text = "" + async for response_chunk in await agent.send_message(user_message): + response_text += response_chunk.message.content + if response_chunk.is_final: + logger.info(f"Assistant: {response_text}") + break + else: + # Handle single response + response = await agent.send_message(user_message) + logger.info(f"Assistant: {response.message.content}") + + # Small delay between messages + await asyncio.sleep(1) + + # Get and display conversation summary + history = await agent.get_conversation_history() + logger.info(f"Conversation completed with {len(history)} messages using AI Foundry") + + except Exception as e: + logger.error(f"Error in AI Foundry example: {e}") + raise + finally: + # Clean up + if 'agent' in locals(): + await agent.close() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/real_time_agent/examples/basic_usage.py b/src/real_time_agent/examples/basic_usage.py new file mode 100644 index 0000000..ed48ee3 --- /dev/null +++ b/src/real_time_agent/examples/basic_usage.py @@ -0,0 +1,83 @@ +"""Basic example of using the real-time agent with Azure OpenAI.""" + +import asyncio +import logging +from real_time_agent import RealTimeAgent, AgentConfig, AzureOpenAIProvider +from real_time_agent.utils.helpers import LoggingEventHandler + + +async def main(): + """Basic example of real-time agent usage.""" + # Setup logging + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + ) + logger = logging.getLogger(__name__) + + try: + # Create configuration + config = AgentConfig( + provider="azure_openai", + azure_openai_endpoint="https://your-resource.openai.azure.com/", + azure_openai_deployment="gpt-4", + azure_openai_api_key="your-api-key", # Or use environment variable + max_tokens=500, + temperature=0.7, + streaming=True, + system_prompt="You are a helpful AI assistant. Provide concise and accurate responses." + ) + + # Create provider and event handler + provider = AzureOpenAIProvider(config) + event_handler = LoggingEventHandler(logger) + + # Create and initialize agent + agent = RealTimeAgent(config, provider, event_handler) + await agent.initialize() + + logger.info("Real-time agent initialized successfully!") + + # Set up the system prompt + await agent.set_system_prompt(config.system_prompt) + + # Example conversation + messages = [ + "Hello! Can you help me understand what real-time agents are?", + "What are the benefits of using Azure OpenAI for real-time applications?", + "Can you provide a simple example of how to implement streaming responses?" + ] + + for user_message in messages: + logger.info(f"User: {user_message}") + + # Send message and handle response + if config.streaming: + # Handle streaming response + async for response_chunk in await agent.send_message(user_message): + if response_chunk.is_final: + logger.info(f"Assistant: {response_chunk.message.content}") + break + else: + # Handle single response + response = await agent.send_message(user_message) + logger.info(f"Assistant: {response.message.content}") + + # Small delay between messages + await asyncio.sleep(1) + + # Get conversation history + history = await agent.get_conversation_history() + logger.info(f"Conversation completed with {len(history)} messages") + + except Exception as e: + logger.error(f"Error in example: {e}") + raise + finally: + # Clean up + if 'agent' in locals(): + await agent.close() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/src/real_time_agent/integrations/__init__.py b/src/real_time_agent/integrations/__init__.py new file mode 100644 index 0000000..9dda521 --- /dev/null +++ b/src/real_time_agent/integrations/__init__.py @@ -0,0 +1 @@ +"""Integration modules for different AI providers.""" \ No newline at end of file diff --git a/src/real_time_agent/integrations/ai_foundry.py b/src/real_time_agent/integrations/ai_foundry.py new file mode 100644 index 0000000..d4e93a6 --- /dev/null +++ b/src/real_time_agent/integrations/ai_foundry.py @@ -0,0 +1,213 @@ +"""AI Foundry provider implementation for real-time agents.""" + +import asyncio +import logging +import uuid +from datetime import datetime +from typing import Any, AsyncIterator, Dict, List, Optional, Union + +import aiohttp +from azure.identity import DefaultAzureCredential +from azure.ai.inference.aio import ChatCompletionsClient +from azure.ai.inference.models import SystemMessage, UserMessage, AssistantMessage + +from ..core.interfaces import AIProvider, AgentResponse, Message, MessageType +from ..core.config import AgentConfig + + +class AIFoundryProvider(AIProvider): + """AI Foundry provider for real-time agent capabilities.""" + + def __init__(self, config: AgentConfig): + """Initialize AI Foundry provider. + + Args: + config: Agent configuration containing AI Foundry settings + """ + self.config = config + self.client: Optional[ChatCompletionsClient] = None + self.logger = logging.getLogger(__name__) + + # Validate required configuration + if not config.ai_foundry_endpoint: + raise ValueError("AI Foundry endpoint is required") + + async def initialize(self) -> None: + """Initialize the AI Foundry client.""" + self.logger.info("Initializing AI Foundry provider") + + try: + # Use API key if provided, otherwise use Azure credential + if self.config.ai_foundry_api_key: + from azure.core.credentials import AzureKeyCredential + credential = AzureKeyCredential(self.config.ai_foundry_api_key) + else: + credential = DefaultAzureCredential() + + self.client = ChatCompletionsClient( + endpoint=self.config.ai_foundry_endpoint, + credential=credential + ) + + # Test the connection + await self._test_connection() + self.logger.info("AI Foundry provider initialized successfully") + + except Exception as e: + self.logger.error(f"Failed to initialize AI Foundry provider: {e}") + raise + + async def _test_connection(self) -> None: + """Test the AI Foundry connection.""" + if not self.client: + raise RuntimeError("Client not initialized") + + try: + # Simple test call to verify connectivity + response = await self.client.complete( + messages=[UserMessage(content="Hello")], + max_tokens=1, + temperature=0 + ) + self.logger.debug("Connection test successful") + except Exception as e: + self.logger.error(f"Connection test failed: {e}") + raise + + def _messages_to_foundry_format(self, messages: List[Message]) -> List[Any]: + """Convert internal message format to AI Foundry format.""" + foundry_messages = [] + + for message in messages: + if message.type == MessageType.USER: + foundry_messages.append(UserMessage(content=message.content)) + elif message.type == MessageType.ASSISTANT: + foundry_messages.append(AssistantMessage(content=message.content)) + elif message.type == MessageType.SYSTEM: + foundry_messages.append(SystemMessage(content=message.content)) + + return foundry_messages + + async def generate_response( + self, + messages: List[Message], + stream: bool = True, + **kwargs: Any + ) -> Union[AgentResponse, AsyncIterator[AgentResponse]]: + """Generate response using AI Foundry. + + Args: + messages: List of conversation messages + stream: Whether to stream the response + **kwargs: Additional parameters for the API call + + Returns: + Single response or async iterator for streaming responses + """ + if not self.client: + raise RuntimeError("Provider not initialized") + + # Convert messages to AI Foundry format + foundry_messages = self._messages_to_foundry_format(messages) + + # Prepare API call parameters + api_params = { + "messages": foundry_messages, + "max_tokens": kwargs.get("max_tokens", self.config.max_tokens), + "temperature": kwargs.get("temperature", self.config.temperature), + "stream": stream + } + + # Add additional parameters + for key, value in kwargs.items(): + if key not in ["max_tokens", "temperature"] and value is not None: + api_params[key] = value + + try: + if stream: + return self._generate_streaming_response(api_params) + else: + return await self._generate_single_response(api_params) + + except Exception as e: + self.logger.error(f"Failed to generate response: {e}") + raise + + async def _generate_single_response(self, api_params: Dict[str, Any]) -> AgentResponse: + """Generate a single (non-streaming) response.""" + response = await self.client.complete(**api_params) + + # Extract response content + content = response.choices[0].message.content or "" + + # Create message + message = Message( + id=str(uuid.uuid4()), + type=MessageType.ASSISTANT, + content=content, + timestamp=datetime.now(), + metadata={ + "model": getattr(response, 'model', 'ai-foundry'), + "finish_reason": response.choices[0].finish_reason + } + ) + + # Create usage info + usage = None + if hasattr(response, 'usage') and response.usage: + usage = { + "prompt_tokens": response.usage.prompt_tokens, + "completion_tokens": response.usage.completion_tokens, + "total_tokens": response.usage.total_tokens + } + + return AgentResponse( + message=message, + is_final=True, + usage=usage + ) + + async def _generate_streaming_response( + self, + api_params: Dict[str, Any] + ) -> AsyncIterator[AgentResponse]: + """Generate streaming responses.""" + message_id = str(uuid.uuid4()) + accumulated_content = "" + + async for chunk in await self.client.complete(**api_params): + if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: + content_delta = chunk.choices[0].delta.content + accumulated_content += content_delta + + # Create incremental message + message = Message( + id=message_id, + type=MessageType.ASSISTANT, + content=content_delta, + timestamp=datetime.now(), + metadata={ + "model": getattr(chunk, 'model', 'ai-foundry'), + "accumulated_content": accumulated_content + } + ) + + # Check if this is the final chunk + is_final = ( + chunk.choices[0].finish_reason is not None and + chunk.choices[0].finish_reason != "null" + ) + + yield AgentResponse( + message=message, + is_final=is_final, + usage=None # Usage info typically comes with final chunk + ) + + async def close(self) -> None: + """Close the AI Foundry provider.""" + self.logger.info("Closing AI Foundry provider") + + if self.client: + await self.client.close() + self.client = None \ No newline at end of file diff --git a/src/real_time_agent/integrations/azure_openai.py b/src/real_time_agent/integrations/azure_openai.py new file mode 100644 index 0000000..bd26e9e --- /dev/null +++ b/src/real_time_agent/integrations/azure_openai.py @@ -0,0 +1,226 @@ +"""Azure OpenAI provider implementation for real-time agents.""" + +import asyncio +import logging +import uuid +from datetime import datetime +from typing import Any, AsyncIterator, Dict, List, Optional, Union + +from openai import AsyncAzureOpenAI +from azure.identity import DefaultAzureCredential + +from ..core.interfaces import AIProvider, AgentResponse, Message, MessageType +from ..core.config import AgentConfig + + +class AzureOpenAIProvider(AIProvider): + """Azure OpenAI provider for real-time agent capabilities.""" + + def __init__(self, config: AgentConfig): + """Initialize Azure OpenAI provider. + + Args: + config: Agent configuration containing Azure OpenAI settings + """ + self.config = config + self.client: Optional[AsyncAzureOpenAI] = None + self.logger = logging.getLogger(__name__) + + # Validate required configuration + if not config.azure_openai_endpoint: + raise ValueError("Azure OpenAI endpoint is required") + if not config.azure_openai_deployment: + raise ValueError("Azure OpenAI deployment name is required") + + async def initialize(self) -> None: + """Initialize the Azure OpenAI client.""" + self.logger.info("Initializing Azure OpenAI provider") + + try: + # Use API key if provided, otherwise use Azure credential + if self.config.azure_openai_api_key: + self.client = AsyncAzureOpenAI( + api_key=self.config.azure_openai_api_key, + api_version=self.config.azure_openai_api_version, + azure_endpoint=self.config.azure_openai_endpoint + ) + else: + # Use managed identity or default Azure credential + credential = DefaultAzureCredential() + token = await credential.get_token("https://cognitiveservices.azure.com/.default") + + self.client = AsyncAzureOpenAI( + api_key=token.token, + api_version=self.config.azure_openai_api_version, + azure_endpoint=self.config.azure_openai_endpoint + ) + + # Test the connection + await self._test_connection() + self.logger.info("Azure OpenAI provider initialized successfully") + + except Exception as e: + self.logger.error(f"Failed to initialize Azure OpenAI provider: {e}") + raise + + async def _test_connection(self) -> None: + """Test the Azure OpenAI connection.""" + if not self.client: + raise RuntimeError("Client not initialized") + + try: + # Simple test call to verify connectivity + response = await self.client.chat.completions.create( + model=self.config.azure_openai_deployment, + messages=[{"role": "user", "content": "Hello"}], + max_tokens=1, + temperature=0 + ) + self.logger.debug("Connection test successful") + except Exception as e: + self.logger.error(f"Connection test failed: {e}") + raise + + def _messages_to_openai_format(self, messages: List[Message]) -> List[Dict[str, str]]: + """Convert internal message format to OpenAI format.""" + openai_messages = [] + + for message in messages: + role_map = { + MessageType.USER: "user", + MessageType.ASSISTANT: "assistant", + MessageType.SYSTEM: "system" + } + + if message.type in role_map: + openai_messages.append({ + "role": role_map[message.type], + "content": message.content + }) + + return openai_messages + + async def generate_response( + self, + messages: List[Message], + stream: bool = True, + **kwargs: Any + ) -> Union[AgentResponse, AsyncIterator[AgentResponse]]: + """Generate response using Azure OpenAI. + + Args: + messages: List of conversation messages + stream: Whether to stream the response + **kwargs: Additional parameters for the API call + + Returns: + Single response or async iterator for streaming responses + """ + if not self.client: + raise RuntimeError("Provider not initialized") + + # Convert messages to OpenAI format + openai_messages = self._messages_to_openai_format(messages) + + # Prepare API call parameters + api_params = { + "model": self.config.azure_openai_deployment, + "messages": openai_messages, + "max_tokens": kwargs.get("max_tokens", self.config.max_tokens), + "temperature": kwargs.get("temperature", self.config.temperature), + "stream": stream + } + + # Add additional parameters + for key, value in kwargs.items(): + if key not in ["max_tokens", "temperature"] and value is not None: + api_params[key] = value + + try: + if stream: + return self._generate_streaming_response(api_params) + else: + return await self._generate_single_response(api_params) + + except Exception as e: + self.logger.error(f"Failed to generate response: {e}") + raise + + async def _generate_single_response(self, api_params: Dict[str, Any]) -> AgentResponse: + """Generate a single (non-streaming) response.""" + response = await self.client.chat.completions.create(**api_params) + + # Extract response content + content = response.choices[0].message.content or "" + + # Create message + message = Message( + id=str(uuid.uuid4()), + type=MessageType.ASSISTANT, + content=content, + timestamp=datetime.now(), + metadata={ + "model": response.model, + "finish_reason": response.choices[0].finish_reason + } + ) + + # Create usage info + usage = None + if response.usage: + usage = { + "prompt_tokens": response.usage.prompt_tokens, + "completion_tokens": response.usage.completion_tokens, + "total_tokens": response.usage.total_tokens + } + + return AgentResponse( + message=message, + is_final=True, + usage=usage + ) + + async def _generate_streaming_response( + self, + api_params: Dict[str, Any] + ) -> AsyncIterator[AgentResponse]: + """Generate streaming responses.""" + message_id = str(uuid.uuid4()) + accumulated_content = "" + + async for chunk in await self.client.chat.completions.create(**api_params): + if chunk.choices and chunk.choices[0].delta.content: + content_delta = chunk.choices[0].delta.content + accumulated_content += content_delta + + # Create incremental message + message = Message( + id=message_id, + type=MessageType.ASSISTANT, + content=content_delta, + timestamp=datetime.now(), + metadata={ + "model": chunk.model, + "accumulated_content": accumulated_content + } + ) + + # Check if this is the final chunk + is_final = ( + chunk.choices[0].finish_reason is not None and + chunk.choices[0].finish_reason != "null" + ) + + yield AgentResponse( + message=message, + is_final=is_final, + usage=None # Usage info typically comes with final chunk + ) + + async def close(self) -> None: + """Close the Azure OpenAI provider.""" + self.logger.info("Closing Azure OpenAI provider") + + if self.client: + await self.client.close() + self.client = None \ No newline at end of file diff --git a/src/real_time_agent/utils/__init__.py b/src/real_time_agent/utils/__init__.py new file mode 100644 index 0000000..b8795e3 --- /dev/null +++ b/src/real_time_agent/utils/__init__.py @@ -0,0 +1 @@ +"""Utility functions and helpers.""" \ No newline at end of file diff --git a/src/real_time_agent/utils/helpers.py b/src/real_time_agent/utils/helpers.py new file mode 100644 index 0000000..331c5e3 --- /dev/null +++ b/src/real_time_agent/utils/helpers.py @@ -0,0 +1,249 @@ +"""Utility functions for real-time agents.""" + +import asyncio +import json +import logging +from typing import Any, Dict, List, Optional +from datetime import datetime + +from ..core.interfaces import Message, MessageType + + +class LoggingEventHandler: + """Simple event handler that logs all events.""" + + def __init__(self, logger: Optional[logging.Logger] = None): + """Initialize logging event handler. + + Args: + logger: Optional logger instance. If not provided, creates a new one. + """ + self.logger = logger or logging.getLogger(__name__) + + async def on_message(self, message: Message) -> None: + """Log incoming messages.""" + self.logger.info(f"Message [{message.type.value}]: {message.content[:100]}...") + + async def on_connection_status(self, status) -> None: + """Log connection status changes.""" + self.logger.info(f"Connection status changed to: {status.value}") + + async def on_error(self, error: Exception) -> None: + """Log errors.""" + self.logger.error(f"Error occurred: {error}") + + +class ConversationExporter: + """Utility for exporting conversation history.""" + + @staticmethod + def to_json(messages: List[Message], pretty: bool = True) -> str: + """Export messages to JSON format. + + Args: + messages: List of messages to export + pretty: Whether to format JSON with indentation + + Returns: + JSON string representation of the conversation + """ + conversation_data = { + "conversation": [message.to_dict() for message in messages], + "exported_at": datetime.now().isoformat(), + "message_count": len(messages) + } + + if pretty: + return json.dumps(conversation_data, indent=2, ensure_ascii=False) + else: + return json.dumps(conversation_data, ensure_ascii=False) + + @staticmethod + def to_markdown(messages: List[Message]) -> str: + """Export messages to Markdown format. + + Args: + messages: List of messages to export + + Returns: + Markdown string representation of the conversation + """ + markdown_lines = [ + "# Conversation Export", + f"Exported on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + f"Total messages: {len(messages)}", + "", + "---", + "" + ] + + for message in messages: + timestamp = message.timestamp.strftime("%H:%M:%S") + + if message.type == MessageType.USER: + markdown_lines.extend([ + f"## User ({timestamp})", + message.content, + "" + ]) + elif message.type == MessageType.ASSISTANT: + markdown_lines.extend([ + f"## Assistant ({timestamp})", + message.content, + "" + ]) + elif message.type == MessageType.SYSTEM: + markdown_lines.extend([ + f"## System ({timestamp})", + f"_{message.content}_", + "" + ]) + + return "\n".join(markdown_lines) + + +class TokenCounter: + """Utility for counting tokens in messages.""" + + @staticmethod + def estimate_tokens(text: str, model: str = "gpt-4") -> int: + """Estimate token count for text. + + This is a rough estimation. For accurate counting, use tiktoken library. + + Args: + text: Text to count tokens for + model: Model name for token estimation + + Returns: + Estimated token count + """ + # Rough estimation: ~4 characters per token for most models + return len(text) // 4 + + @staticmethod + def estimate_conversation_tokens(messages: List[Message], model: str = "gpt-4") -> int: + """Estimate total tokens for a conversation. + + Args: + messages: List of messages in the conversation + model: Model name for token estimation + + Returns: + Estimated total token count + """ + total_tokens = 0 + + for message in messages: + # Add tokens for the message content + total_tokens += TokenCounter.estimate_tokens(message.content, model) + + # Add overhead tokens for message structure (role, etc.) + total_tokens += 10 # Rough overhead per message + + return total_tokens + + +class MessageValidator: + """Utility for validating messages.""" + + @staticmethod + def validate_message(message: Message) -> List[str]: + """Validate a message and return list of validation errors. + + Args: + message: Message to validate + + Returns: + List of validation error messages (empty if valid) + """ + errors = [] + + if not message.id: + errors.append("Message ID is required") + + if not message.content: + errors.append("Message content is required") + + if not message.timestamp: + errors.append("Message timestamp is required") + + if message.type not in MessageType: + errors.append(f"Invalid message type: {message.type}") + + return errors + + @staticmethod + def validate_conversation(messages: List[Message]) -> List[str]: + """Validate a conversation and return list of validation errors. + + Args: + messages: List of messages to validate + + Returns: + List of validation error messages (empty if valid) + """ + errors = [] + + for i, message in enumerate(messages): + message_errors = MessageValidator.validate_message(message) + for error in message_errors: + errors.append(f"Message {i}: {error}") + + return errors + + +class AsyncRetry: + """Utility for retrying async operations.""" + + def __init__( + self, + max_attempts: int = 3, + delay: float = 1.0, + backoff_factor: float = 2.0, + exceptions: tuple = (Exception,) + ): + """Initialize retry configuration. + + Args: + max_attempts: Maximum number of retry attempts + delay: Initial delay between retries in seconds + backoff_factor: Factor to multiply delay by for each retry + exceptions: Tuple of exception types to retry on + """ + self.max_attempts = max_attempts + self.delay = delay + self.backoff_factor = backoff_factor + self.exceptions = exceptions + + async def __call__(self, func, *args, **kwargs): + """Execute function with retry logic. + + Args: + func: Async function to execute + *args: Arguments for the function + **kwargs: Keyword arguments for the function + + Returns: + Function result + + Raises: + Last exception if all retries fail + """ + last_exception = None + current_delay = self.delay + + for attempt in range(self.max_attempts): + try: + return await func(*args, **kwargs) + except self.exceptions as e: + last_exception = e + + if attempt < self.max_attempts - 1: # Not the last attempt + await asyncio.sleep(current_delay) + current_delay *= self.backoff_factor + else: + break + + # If we get here, all retries failed + raise last_exception \ No newline at end of file diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 0000000..e8b4293 --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,333 @@ +"""Basic tests for the real-time agent implementation.""" + +import pytest +import asyncio +from unittest.mock import AsyncMock, MagicMock +from datetime import datetime + +from real_time_agent.core.config import AgentConfig +from real_time_agent.core.interfaces import Message, MessageType, ConnectionStatus +from real_time_agent.core.agent import RealTimeAgent +from real_time_agent.utils.helpers import ( + LoggingEventHandler, + ConversationExporter, + TokenCounter, + MessageValidator +) + + +class TestAgentConfig: + """Test agent configuration.""" + + def test_default_config(self): + """Test default configuration values.""" + config = AgentConfig() + assert config.provider == "azure_openai" + assert config.max_tokens == 1000 + assert config.temperature == 0.7 + assert config.streaming is True + + def test_custom_config(self): + """Test custom configuration values.""" + config = AgentConfig( + provider="ai_foundry", + max_tokens=500, + temperature=0.5, + streaming=False + ) + assert config.provider == "ai_foundry" + assert config.max_tokens == 500 + assert config.temperature == 0.5 + assert config.streaming is False + + +class TestMessage: + """Test message functionality.""" + + def test_message_creation(self): + """Test message creation and conversion.""" + message = Message( + id="test-123", + type=MessageType.USER, + content="Hello, world!", + timestamp=datetime.now(), + metadata={"test": "value"} + ) + + assert message.id == "test-123" + assert message.type == MessageType.USER + assert message.content == "Hello, world!" + assert message.metadata["test"] == "value" + + def test_message_to_dict(self): + """Test message serialization.""" + message = Message( + id="test-123", + type=MessageType.USER, + content="Hello, world!", + timestamp=datetime.now() + ) + + message_dict = message.to_dict() + assert message_dict["id"] == "test-123" + assert message_dict["type"] == "user" + assert message_dict["content"] == "Hello, world!" + assert "timestamp" in message_dict + + +class TestUtilities: + """Test utility functions.""" + + def test_token_counter(self): + """Test token counting utility.""" + text = "Hello, world! This is a test message." + tokens = TokenCounter.estimate_tokens(text) + assert tokens > 0 + assert isinstance(tokens, int) + + def test_conversation_tokens(self): + """Test conversation token counting.""" + messages = [ + Message( + id="1", + type=MessageType.USER, + content="Hello", + timestamp=datetime.now() + ), + Message( + id="2", + type=MessageType.ASSISTANT, + content="Hi there!", + timestamp=datetime.now() + ) + ] + + total_tokens = TokenCounter.estimate_conversation_tokens(messages) + assert total_tokens > 0 + assert isinstance(total_tokens, int) + + def test_message_validator(self): + """Test message validation.""" + # Valid message + valid_message = Message( + id="test-123", + type=MessageType.USER, + content="Hello", + timestamp=datetime.now() + ) + errors = MessageValidator.validate_message(valid_message) + assert len(errors) == 0 + + # Invalid message + invalid_message = Message( + id="", + type=MessageType.USER, + content="", + timestamp=datetime.now() + ) + errors = MessageValidator.validate_message(invalid_message) + assert len(errors) > 0 + + def test_conversation_exporter(self): + """Test conversation export functionality.""" + messages = [ + Message( + id="1", + type=MessageType.USER, + content="Hello", + timestamp=datetime.now() + ), + Message( + id="2", + type=MessageType.ASSISTANT, + content="Hi there!", + timestamp=datetime.now() + ) + ] + + # Test JSON export + json_export = ConversationExporter.to_json(messages) + assert "conversation" in json_export + assert "exported_at" in json_export + + # Test Markdown export + markdown_export = ConversationExporter.to_markdown(messages) + assert "# Conversation Export" in markdown_export + assert "## User" in markdown_export + assert "## Assistant" in markdown_export + + +class TestEventHandler: + """Test event handler functionality.""" + + @pytest.mark.asyncio + async def test_logging_event_handler(self): + """Test logging event handler.""" + handler = LoggingEventHandler() + + message = Message( + id="test-123", + type=MessageType.USER, + content="Test message", + timestamp=datetime.now() + ) + + # Should not raise any exceptions + await handler.on_message(message) + await handler.on_connection_status(ConnectionStatus.CONNECTED) + await handler.on_error(Exception("Test error")) + + +class MockProvider: + """Mock AI provider for testing.""" + + def __init__(self): + self.initialized = False + + async def initialize(self): + """Mock initialization.""" + self.initialized = True + + async def generate_response(self, messages, stream=True, **kwargs): + """Mock response generation.""" + from real_time_agent.core.interfaces import AgentResponse + + response_message = Message( + id="response-123", + type=MessageType.ASSISTANT, + content="Mock response", + timestamp=datetime.now() + ) + + return AgentResponse( + message=response_message, + is_final=True, + usage={"total_tokens": 10} + ) + + async def close(self): + """Mock close.""" + self.initialized = False + + +class TestRealTimeAgent: + """Test real-time agent functionality.""" + + @pytest.mark.asyncio + async def test_agent_initialization(self): + """Test agent initialization.""" + config = AgentConfig() + provider = MockProvider() + agent = RealTimeAgent(config, provider) + + await agent.initialize() + assert agent.status == ConnectionStatus.CONNECTED + assert provider.initialized is True + + await agent.close() + + @pytest.mark.asyncio + async def test_send_message(self): + """Test sending messages.""" + config = AgentConfig(streaming=False) + provider = MockProvider() + agent = RealTimeAgent(config, provider) + + await agent.initialize() + + response = await agent.send_message("Hello, agent!") + assert response.message.content == "Mock response" + assert response.is_final is True + + # Check conversation history + history = await agent.get_conversation_history() + assert len(history) == 2 # User message + Assistant response + assert history[0].type == MessageType.USER + assert history[1].type == MessageType.ASSISTANT + + await agent.close() + + @pytest.mark.asyncio + async def test_system_prompt(self): + """Test system prompt functionality.""" + config = AgentConfig() + provider = MockProvider() + agent = RealTimeAgent(config, provider) + + await agent.initialize() + + await agent.set_system_prompt("You are a test assistant.") + + history = await agent.get_conversation_history() + assert len(history) == 1 + assert history[0].type == MessageType.SYSTEM + assert history[0].content == "You are a test assistant." + + await agent.close() + + @pytest.mark.asyncio + async def test_clear_conversation(self): + """Test clearing conversation history.""" + config = AgentConfig(streaming=False) + provider = MockProvider() + agent = RealTimeAgent(config, provider) + + await agent.initialize() + + # Send a message + await agent.send_message("Hello!") + + # Check history has messages + history = await agent.get_conversation_history() + assert len(history) > 0 + + # Clear conversation + await agent.clear_conversation() + + # Check history is empty + history = await agent.get_conversation_history() + assert len(history) == 0 + + await agent.close() + + +if __name__ == "__main__": + # Run tests manually if pytest is not available + import sys + + print("Running basic tests...") + + # Test configuration + try: + test_config = TestAgentConfig() + test_config.test_default_config() + test_config.test_custom_config() + print("βœ… Configuration tests passed") + except Exception as e: + print(f"❌ Configuration tests failed: {e}") + sys.exit(1) + + # Test message functionality + try: + test_message = TestMessage() + test_message.test_message_creation() + test_message.test_message_to_dict() + print("βœ… Message tests passed") + except Exception as e: + print(f"❌ Message tests failed: {e}") + sys.exit(1) + + # Test utilities + try: + test_utils = TestUtilities() + test_utils.test_token_counter() + test_utils.test_conversation_tokens() + test_utils.test_message_validator() + test_utils.test_conversation_exporter() + print("βœ… Utility tests passed") + except Exception as e: + print(f"❌ Utility tests failed: {e}") + sys.exit(1) + + print("βœ… All basic tests passed!") + print("Note: Run 'pytest tests/' for full async test suite") \ No newline at end of file From 2a37e519a5613fca2daec327763e2e22c9174d3f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 10 Sep 2025 09:46:36 +0000 Subject: [PATCH 3/3] Fix conversation history tracking in streaming responses and complete implementation Co-authored-by: marvinbuss <34542414+marvinbuss@users.noreply.github.com> --- src/real_time_agent/core/agent.py | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/real_time_agent/core/agent.py b/src/real_time_agent/core/agent.py index c58bd3e..c51cebb 100644 --- a/src/real_time_agent/core/agent.py +++ b/src/real_time_agent/core/agent.py @@ -115,8 +115,8 @@ async def send_message( # Handle streaming vs non-streaming responses if self.config.streaming: - # Return the async generator directly for streaming - return self._handle_streaming_response(response) + # Return a wrapper that ensures conversation history is updated + return self._streaming_response_wrapper(response) else: # Handle single response return await self._handle_single_response(response) @@ -128,28 +128,33 @@ async def send_message( await self.event_handler.on_error(e) raise - async def _handle_streaming_response( + async def _streaming_response_wrapper( self, response_stream: AsyncIterator[AgentResponse] ) -> AsyncIterator[AgentResponse]: - """Handle streaming responses from the provider.""" + """Wrapper that ensures conversation history is updated for streaming responses.""" accumulated_content = "" message_id = str(uuid.uuid4()) async for chunk in response_stream: - accumulated_content += chunk.message.content + # Extract content from this chunk + chunk_content = chunk.message.content - # Update message with accumulated content + # Update accumulated content + if chunk.message.metadata and 'accumulated_content' in chunk.message.metadata: + accumulated_content = chunk.message.metadata['accumulated_content'] + else: + accumulated_content += chunk_content + + # Update chunk message ID to be consistent chunk.message.id = message_id chunk.message.timestamp = datetime.now() - # Notify event handler + # Notify event handler of the chunk if self.event_handler: await self.event_handler.on_message(chunk.message) - yield chunk - - # If this is the final chunk, add to conversation history + # If this is the final chunk, immediately add to conversation history BEFORE yielding if chunk.is_final: final_message = Message( id=message_id, @@ -159,6 +164,12 @@ async def _handle_streaming_response( metadata=chunk.message.metadata ) self.conversation_history.append(final_message) + self.logger.debug(f"Added final message to history. New length: {len(self.conversation_history)}") + + yield chunk + + if chunk.is_final: + break async def _handle_single_response(self, response: AgentResponse) -> AgentResponse: """Handle single (non-streaming) responses from the provider."""