From 953fcaab7761cceb047689baace74ba87e4b94ba Mon Sep 17 00:00:00 2001 From: kgand <34491606+kgand@users.noreply.github.com> Date: Sat, 27 Sep 2025 02:35:44 -0400 Subject: [PATCH 1/4] feat(backend): implement FastAPI backend with WebSocket ingest, Gemini Live, and ADK orchestrator --- assist/server/adk_orchestrator.py | 251 ++++++++++++++++++++++++ assist/server/app.py | 168 ++++++++++++++++ assist/server/gemini_live.py | 168 ++++++++++++++++ assist/server/legacy/vectorizer.py | 16 ++ assist/server/memory/store_firestore.py | 236 ++++++++++++++++++++++ assist/server/models/schemas.py | 133 +++++++++++++ assist/server/prompts/actions.md | 43 ++++ assist/server/prompts/relationships.md | 47 +++++ assist/server/prompts/revive.md | 43 ++++ assist/server/prompts/summarize.md | 44 +++++ assist/server/requirements.txt | 12 ++ assist/server/ws_ingest.py | 123 ++++++++++++ 12 files changed, 1284 insertions(+) create mode 100644 assist/server/adk_orchestrator.py create mode 100644 assist/server/app.py create mode 100644 assist/server/gemini_live.py create mode 100644 assist/server/legacy/vectorizer.py create mode 100644 assist/server/memory/store_firestore.py create mode 100644 assist/server/models/schemas.py create mode 100644 assist/server/prompts/actions.md create mode 100644 assist/server/prompts/relationships.md create mode 100644 assist/server/prompts/revive.md create mode 100644 assist/server/prompts/summarize.md create mode 100644 assist/server/requirements.txt create mode 100644 assist/server/ws_ingest.py diff --git a/assist/server/adk_orchestrator.py b/assist/server/adk_orchestrator.py new file mode 100644 index 00000000..2c702b36 --- /dev/null +++ b/assist/server/adk_orchestrator.py @@ -0,0 +1,251 @@ +""" +ADK (Agent Development Kit) orchestrator for conversation processing +""" + +import asyncio +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime +import json + +logger = logging.getLogger(__name__) + +class ADKOrchestrator: + def __init__(self, memory_store): + self.memory_store = memory_store + self.active_sessions: Dict[str, Dict] = {} + self.processing_queue = asyncio.Queue() + self.is_running = False + + # Initialize processing tasks + self.tasks = { + 'transcriber': self._transcriber_task, + 'summarizer': self._summarizer_task, + 'action_planner': self._action_planner_task, + 'relationship_miner': self._relationship_miner_task, + 'memory_writer': self._memory_writer_task + } + + async def start(self): + """Start the ADK orchestrator""" + if self.is_running: + return + + self.is_running = True + + # Start all processing tasks + for task_name, task_func in self.tasks.items(): + asyncio.create_task(self._run_task(task_name, task_func)) + + logger.info("ADK orchestrator started") + + async def stop(self): + """Stop the ADK orchestrator""" + self.is_running = False + logger.info("ADK orchestrator stopped") + + async def _run_task(self, task_name: str, task_func): + """Run a specific processing task""" + while self.is_running: + try: + # Get next item from queue + item = await asyncio.wait_for(self.processing_queue.get(), timeout=1.0) + + # Process with specific task + await task_func(item) + + except asyncio.TimeoutError: + continue + except Exception as e: + logger.error(f"Error in {task_name} task: {e}") + + async def process_audio_chunk(self, audio_frames: List[float], connection_id: str): + """Process audio chunk through the pipeline""" + try: + # Create processing item + item = { + 'type': 'audio_chunk', + 'data': audio_frames, + 'connection_id': connection_id, + 'timestamp': datetime.utcnow().isoformat() + } + + # Add to processing queue + await self.processing_queue.put(item) + + except Exception as e: + logger.error(f"Failed to process audio chunk: {e}") + + async def process_video_frame(self, frame_data: bytes, connection_id: str): + """Process video frame through the pipeline""" + try: + # Create processing item + item = { + 'type': 'video_frame', + 'data': frame_data, + 'connection_id': connection_id, + 'timestamp': datetime.utcnow().isoformat() + } + + # Add to processing queue + await self.processing_queue.put(item) + + except Exception as e: + logger.error(f"Failed to process video frame: {e}") + + async def _transcriber_task(self, item: Dict[str, Any]): + """Transcriber task - converts audio to text""" + try: + if item['type'] == 'audio_chunk': + # In a real implementation, this would use speech recognition + # For now, we'll simulate transcription + transcript = await self._simulate_transcription(item['data']) + + if transcript: + # Store transcript + await self.memory_store.store_utterance({ + 'text': transcript, + 'timestamp': item['timestamp'], + 'connection_id': item['connection_id'], + 'type': 'transcript' + }) + + logger.info(f"Transcribed: {transcript[:50]}...") + + except Exception as e: + logger.error(f"Transcriber task error: {e}") + + async def _summarizer_task(self, item: Dict[str, Any]): + """Summarizer task - creates conversation summaries""" + try: + if item['type'] == 'audio_chunk': + # Get recent utterances for summarization + recent_utterances = await self.memory_store.get_recent_utterances(limit=5) + + if len(recent_utterances) >= 3: # Summarize when we have enough content + summary = await self._generate_summary(recent_utterances) + + if summary: + await self.memory_store.store_memory({ + 'type': 'summary', + 'content': summary, + 'timestamp': item['timestamp'], + 'connection_id': item['connection_id'] + }) + + logger.info(f"Generated summary: {summary[:50]}...") + + except Exception as e: + logger.error(f"Summarizer task error: {e}") + + async def _action_planner_task(self, item: Dict[str, Any]): + """Action planner task - extracts action items""" + try: + if item['type'] == 'audio_chunk': + # Get recent utterances for action extraction + recent_utterances = await self.memory_store.get_recent_utterances(limit=10) + + if recent_utterances: + actions = await self._extract_actions(recent_utterances) + + for action in actions: + await self.memory_store.store_action({ + 'description': action['description'], + 'owner': action.get('owner', 'unknown'), + 'due_hint': action.get('due_hint'), + 'status': 'pending', + 'timestamp': item['timestamp'], + 'connection_id': item['connection_id'] + }) + + logger.info(f"Extracted action: {action['description']}") + + except Exception as e: + logger.error(f"Action planner task error: {e}") + + async def _relationship_miner_task(self, item: Dict[str, Any]): + """Relationship miner task - identifies relationships between people""" + try: + if item['type'] == 'audio_chunk': + # Get recent utterances for relationship mining + recent_utterances = await self.memory_store.get_recent_utterances(limit=20) + + if recent_utterances: + relationships = await self._extract_relationships(recent_utterances) + + for relationship in relationships: + await self.memory_store.store_relationship({ + 'person1': relationship['person1'], + 'person2': relationship['person2'], + 'relationship_type': relationship['type'], + 'evidence': relationship['evidence'], + 'timestamp': item['timestamp'], + 'connection_id': item['connection_id'] + }) + + logger.info(f"Extracted relationship: {relationship['person1']} - {relationship['person2']}") + + except Exception as e: + logger.error(f"Relationship miner task error: {e}") + + async def _memory_writer_task(self, item: Dict[str, Any]): + """Memory writer task - persists memories to long-term storage""" + try: + # This task handles the final persistence of processed data + # It ensures all memories are properly stored and indexed + + if item['type'] == 'audio_chunk': + # Update session information + session_id = item['connection_id'] + if session_id not in self.active_sessions: + self.active_sessions[session_id] = { + 'start_time': item['timestamp'], + 'utterance_count': 0, + 'last_activity': item['timestamp'] + } + + self.active_sessions[session_id]['utterance_count'] += 1 + self.active_sessions[session_id]['last_activity'] = item['timestamp'] + + # Persist session data + await self.memory_store.update_session(session_id, self.active_sessions[session_id]) + + except Exception as e: + logger.error(f"Memory writer task error: {e}") + + async def _simulate_transcription(self, audio_frames: List[float]) -> Optional[str]: + """Simulate audio transcription""" + # In a real implementation, this would use speech recognition + # For now, return a placeholder + return "This is a simulated transcription of the audio content." + + async def _generate_summary(self, utterances: List[Dict]) -> Optional[str]: + """Generate conversation summary""" + # In a real implementation, this would use AI summarization + # For now, return a placeholder + return "This is a simulated summary of the conversation." + + async def _extract_actions(self, utterances: List[Dict]) -> List[Dict]: + """Extract action items from utterances""" + # In a real implementation, this would use AI to extract actions + # For now, return placeholder actions + return [ + { + 'description': 'Follow up on the discussed topic', + 'owner': 'user', + 'due_hint': 'next week' + } + ] + + async def _extract_relationships(self, utterances: List[Dict]) -> List[Dict]: + """Extract relationships from utterances""" + # In a real implementation, this would use AI to extract relationships + # For now, return placeholder relationships + return [ + { + 'person1': 'Alice', + 'person2': 'Bob', + 'type': 'colleague', + 'evidence': 'mentioned working together' + } + ] diff --git a/assist/server/app.py b/assist/server/app.py new file mode 100644 index 00000000..103add43 --- /dev/null +++ b/assist/server/app.py @@ -0,0 +1,168 @@ +""" +FastAPI backend for Messenger AI Assistant +Handles WebSocket ingest, Gemini Live integration, and memory management +""" + +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +import asyncio +import logging +from typing import Dict, List +import os +from dotenv import load_dotenv + +from ws_ingest import WebSocketIngest +from gemini_live import GeminiLiveClient +from adk_orchestrator import ADKOrchestrator +from memory.store_firestore import FirestoreMemoryStore +from models.schemas import ReviveRequest, ReviveResponse + +# Load environment variables +load_dotenv() + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Initialize FastAPI app +app = FastAPI( + title="Messenger AI Assistant API", + description="Backend for capturing and analyzing Messenger conversations", + version="1.0.0" +) + +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["chrome-extension://*", "http://localhost:*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Global instances +websocket_ingest = None +gemini_live = None +adk_orchestrator = None +memory_store = None + +@app.on_event("startup") +async def startup_event(): + """Initialize services on startup""" + global websocket_ingest, gemini_live, adk_orchestrator, memory_store + + try: + # Initialize memory store + memory_store = FirestoreMemoryStore() + await memory_store.initialize() + + # Initialize Gemini Live client + gemini_live = GeminiLiveClient() + + # Initialize ADK orchestrator + adk_orchestrator = ADKOrchestrator(memory_store) + + # Initialize WebSocket ingest + websocket_ingest = WebSocketIngest(gemini_live, adk_orchestrator) + + logger.info("All services initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize services: {e}") + raise + +@app.get("/health") +async def health_check(): + """Health check endpoint""" + return { + "status": "healthy", + "services": { + "memory_store": memory_store is not None, + "gemini_live": gemini_live is not None, + "adk_orchestrator": adk_orchestrator is not None, + "websocket_ingest": websocket_ingest is not None + } + } + +@app.websocket("/ingest") +async def websocket_endpoint(websocket: WebSocket): + """WebSocket endpoint for receiving audio/video chunks from Chrome extension""" + await websocket.accept() + + try: + if websocket_ingest: + await websocket_ingest.handle_connection(websocket) + else: + await websocket.close(code=1011, reason="Service not initialized") + + except WebSocketDisconnect: + logger.info("WebSocket client disconnected") + except Exception as e: + logger.error(f"WebSocket error: {e}") + await websocket.close(code=1011, reason="Internal server error") + +@app.post("/revive", response_model=ReviveResponse) +async def revive_memories(request: ReviveRequest): + """Retrieve and assemble memories based on a text cue""" + try: + if not memory_store: + raise HTTPException(status_code=503, detail="Memory store not initialized") + + # Search for relevant memories + memories = await memory_store.search_memories( + query=request.cue, + limit=request.limit or 10 + ) + + # Generate stitched recap using Gemini + if gemini_live: + recap = await gemini_live.generate_recap(memories, request.cue) + else: + recap = "Memory service unavailable" + + return ReviveResponse( + cue=request.cue, + memories=memories, + recap=recap, + count=len(memories) + ) + + except Exception as e: + logger.error(f"Failed to revive memories: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/memories/{user_id}") +async def get_user_memories(user_id: str, limit: int = 50): + """Get recent memories for a specific user""" + try: + if not memory_store: + raise HTTPException(status_code=503, detail="Memory store not initialized") + + memories = await memory_store.get_user_memories(user_id, limit) + return {"user_id": user_id, "memories": memories, "count": len(memories)} + + except Exception as e: + logger.error(f"Failed to get user memories: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.delete("/memories/{memory_id}") +async def delete_memory(memory_id: str): + """Delete a specific memory""" + try: + if not memory_store: + raise HTTPException(status_code=503, detail="Memory store not initialized") + + success = await memory_store.delete_memory(memory_id) + if success: + return {"message": "Memory deleted successfully"} + else: + raise HTTPException(status_code=404, detail="Memory not found") + + except Exception as e: + logger.error(f"Failed to delete memory: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/assist/server/gemini_live.py b/assist/server/gemini_live.py new file mode 100644 index 00000000..868072ff --- /dev/null +++ b/assist/server/gemini_live.py @@ -0,0 +1,168 @@ +""" +Gemini Live API client for real-time audio transcription and processing +""" + +import asyncio +import logging +import json +import base64 +from typing import Optional, List, Dict, Any +import google.generativeai as genai +from google.generativeai.types import HarmCategory, HarmBlockThreshold +import os + +logger = logging.getLogger(__name__) + +class GeminiLiveClient: + def __init__(self): + self.api_key = os.getenv("GEMINI_API_KEY") + if not self.api_key: + raise ValueError("GEMINI_API_KEY environment variable is required") + + genai.configure(api_key=self.api_key) + self.model = genai.GenerativeModel("gemini-2.0-flash-live-001") + self.is_connected = False + self.transcript_buffer = [] + + async def initialize(self): + """Initialize Gemini Live session""" + try: + # Configure safety settings + safety_settings = { + HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, + HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, + HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, + HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, + } + + self.model = genai.GenerativeModel( + "gemini-2.0-flash-live-001", + safety_settings=safety_settings + ) + + self.is_connected = True + logger.info("Gemini Live client initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize Gemini Live client: {e}") + raise + + async def send_audio_frames(self, audio_frames: List[float]): + """Send audio frames to Gemini Live for transcription""" + try: + if not self.is_connected: + await self.initialize() + + # Convert audio frames to base64 + audio_bytes = self._float_array_to_bytes(audio_frames) + audio_b64 = base64.b64encode(audio_bytes).decode('utf-8') + + # Send to Gemini Live + response = await self._send_audio_to_gemini(audio_b64) + + if response and 'transcript' in response: + transcript = response['transcript'] + timestamp = response.get('timestamp', asyncio.get_event_loop().time()) + + # Store transcript + self.transcript_buffer.append({ + 'text': transcript, + 'timestamp': timestamp, + 'confidence': response.get('confidence', 0.0) + }) + + logger.info(f"Transcript received: {transcript[:50]}...") + + return transcript + + except Exception as e: + logger.error(f"Failed to send audio frames to Gemini: {e}") + return None + + async def _send_audio_to_gemini(self, audio_b64: str) -> Optional[Dict[str, Any]]: + """Send audio data to Gemini Live API""" + try: + # Create audio data structure + audio_data = { + "mimeType": "audio/webm", + "data": audio_b64 + } + + # Send to Gemini Live + response = await asyncio.get_event_loop().run_in_executor( + None, + lambda: self.model.generate_content( + audio_data, + generation_config={ + "response_modalities": ["TEXT"], + "response_mime_type": "text/plain" + } + ) + ) + + if response and response.text: + return { + 'transcript': response.text, + 'timestamp': asyncio.get_event_loop().time(), + 'confidence': 0.9 # Placeholder confidence + } + + except Exception as e: + logger.error(f"Gemini Live API error: {e}") + return None + + def _float_array_to_bytes(self, audio_frames: List[float]) -> bytes: + """Convert float array to bytes for transmission""" + import struct + return struct.pack('f' * len(audio_frames), *audio_frames) + + async def generate_recap(self, memories: List[Dict], cue: str) -> str: + """Generate a stitched recap from memories using Gemini 2.5 Pro""" + try: + # Use Gemini 2.5 Pro for reasoning + model = genai.GenerativeModel("gemini-2.5-pro") + + # Prepare context + context = self._prepare_memory_context(memories) + + prompt = f""" + Based on the following memories and the cue "{cue}", create a comprehensive recap: + + Memories: + {context} + + Please provide a coherent summary that connects these memories and addresses the cue. + """ + + response = await asyncio.get_event_loop().run_in_executor( + None, + lambda: model.generate_content(prompt) + ) + + return response.text if response else "Unable to generate recap" + + except Exception as e: + logger.error(f"Failed to generate recap: {e}") + return "Error generating recap" + + def _prepare_memory_context(self, memories: List[Dict]) -> str: + """Prepare memory context for recap generation""" + context_parts = [] + + for memory in memories: + context_parts.append(f""" + - {memory.get('text', '')} + Timestamp: {memory.get('timestamp', 'Unknown')} + Type: {memory.get('type', 'Unknown')} + """) + + return "\n".join(context_parts) + + async def get_recent_transcripts(self, limit: int = 10) -> List[Dict]: + """Get recent transcripts from buffer""" + return self.transcript_buffer[-limit:] if self.transcript_buffer else [] + + async def clear_transcript_buffer(self): + """Clear the transcript buffer""" + self.transcript_buffer.clear() + logger.info("Transcript buffer cleared") diff --git a/assist/server/legacy/vectorizer.py b/assist/server/legacy/vectorizer.py new file mode 100644 index 00000000..038e020b --- /dev/null +++ b/assist/server/legacy/vectorizer.py @@ -0,0 +1,16 @@ +import cohere + +class Vectorizer: + def __init__(self, token: str): + self._token = token + + def encode(self, text: str): + client = cohere.Client(api_key=self._token) + out = client.embed(texts=[text]) + return out["embeddings"][0] + +def cosine_score(a, b): + s = sum(x * y for x, y in zip(a, b)) + na = sum(x * x for x in a) ** 0.5 + nb = sum(y * y for y in b) ** 0.5 + return s / (na * nb) diff --git a/assist/server/memory/store_firestore.py b/assist/server/memory/store_firestore.py new file mode 100644 index 00000000..2a43397c --- /dev/null +++ b/assist/server/memory/store_firestore.py @@ -0,0 +1,236 @@ +""" +Firestore memory store for persistent long-term memory +""" + +import asyncio +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime +import json +import os + +try: + from google.cloud import firestore + from google.cloud.firestore_v1.base_query import FieldFilter +except ImportError: + firestore = None + FieldFilter = None + +logger = logging.getLogger(__name__) + +class FirestoreMemoryStore: + def __init__(self): + self.db = None + self.project_id = os.getenv("GOOGLE_PROJECT_ID") + self.credentials_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") + + if not self.project_id: + raise ValueError("GOOGLE_PROJECT_ID environment variable is required") + + async def initialize(self): + """Initialize Firestore client""" + try: + if firestore is None: + raise ImportError("google-cloud-firestore package is required") + + # Initialize Firestore client + if self.credentials_path: + os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = self.credentials_path + + self.db = firestore.Client(project=self.project_id) + + # Test connection + await self._test_connection() + + logger.info("Firestore memory store initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize Firestore: {e}") + raise + + async def _test_connection(self): + """Test Firestore connection""" + try: + # Try to read from a test collection + test_ref = self.db.collection('_test').document('_test') + test_ref.set({'test': True}) + test_ref.delete() + + except Exception as e: + logger.error(f"Firestore connection test failed: {e}") + raise + + async def store_utterance(self, utterance: Dict[str, Any]) -> str: + """Store a transcribed utterance""" + try: + doc_ref = self.db.collection('utterances').document() + + utterance_data = { + 'text': utterance['text'], + 'timestamp': utterance['timestamp'], + 'connection_id': utterance['connection_id'], + 'type': utterance.get('type', 'transcript'), + 'created_at': datetime.utcnow().isoformat(), + 'user_id': utterance.get('user_id', 'default') + } + + doc_ref.set(utterance_data) + + logger.info(f"Stored utterance: {utterance['text'][:50]}...") + return doc_ref.id + + except Exception as e: + logger.error(f"Failed to store utterance: {e}") + raise + + async def store_memory(self, memory: Dict[str, Any]) -> str: + """Store a processed memory""" + try: + doc_ref = self.db.collection('memories').document() + + memory_data = { + 'content': memory['content'], + 'type': memory['type'], + 'timestamp': memory['timestamp'], + 'connection_id': memory['connection_id'], + 'created_at': datetime.utcnow().isoformat(), + 'user_id': memory.get('user_id', 'default') + } + + doc_ref.set(memory_data) + + logger.info(f"Stored memory: {memory['content'][:50]}...") + return doc_ref.id + + except Exception as e: + logger.error(f"Failed to store memory: {e}") + raise + + async def store_action(self, action: Dict[str, Any]) -> str: + """Store an action item""" + try: + doc_ref = self.db.collection('actions').document() + + action_data = { + 'description': action['description'], + 'owner': action['owner'], + 'due_hint': action.get('due_hint'), + 'status': action['status'], + 'timestamp': action['timestamp'], + 'connection_id': action['connection_id'], + 'created_at': datetime.utcnow().isoformat(), + 'user_id': action.get('user_id', 'default') + } + + doc_ref.set(action_data) + + logger.info(f"Stored action: {action['description']}") + return doc_ref.id + + except Exception as e: + logger.error(f"Failed to store action: {e}") + raise + + async def store_relationship(self, relationship: Dict[str, Any]) -> str: + """Store a relationship""" + try: + doc_ref = self.db.collection('relationships').document() + + relationship_data = { + 'person1': relationship['person1'], + 'person2': relationship['person2'], + 'relationship_type': relationship['relationship_type'], + 'evidence': relationship['evidence'], + 'timestamp': relationship['timestamp'], + 'connection_id': relationship['connection_id'], + 'created_at': datetime.utcnow().isoformat(), + 'user_id': relationship.get('user_id', 'default') + } + + doc_ref.set(relationship_data) + + logger.info(f"Stored relationship: {relationship['person1']} - {relationship['person2']}") + return doc_ref.id + + except Exception as e: + logger.error(f"Failed to store relationship: {e}") + raise + + async def get_recent_utterances(self, limit: int = 10) -> List[Dict[str, Any]]: + """Get recent utterances""" + try: + query = self.db.collection('utterances').order_by('timestamp', direction=firestore.Query.DESCENDING).limit(limit) + docs = query.stream() + + utterances = [] + for doc in docs: + utterance = doc.to_dict() + utterance['id'] = doc.id + utterances.append(utterance) + + return utterances + + except Exception as e: + logger.error(f"Failed to get recent utterances: {e}") + return [] + + async def get_user_memories(self, user_id: str, limit: int = 50) -> List[Dict[str, Any]]: + """Get memories for a specific user""" + try: + query = self.db.collection('memories').where('user_id', '==', user_id).order_by('timestamp', direction=firestore.Query.DESCENDING).limit(limit) + docs = query.stream() + + memories = [] + for doc in docs: + memory = doc.to_dict() + memory['id'] = doc.id + memories.append(memory) + + return memories + + except Exception as e: + logger.error(f"Failed to get user memories: {e}") + return [] + + async def search_memories(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """Search memories by text content""" + try: + # Simple text search - in production, you'd use vector search + query_ref = self.db.collection('memories') + docs = query_ref.stream() + + results = [] + for doc in docs: + memory = doc.to_dict() + if query.lower() in memory.get('content', '').lower(): + memory['id'] = doc.id + results.append(memory) + + if len(results) >= limit: + break + + return results + + except Exception as e: + logger.error(f"Failed to search memories: {e}") + return [] + + async def update_session(self, session_id: str, session_data: Dict[str, Any]): + """Update session information""" + try: + doc_ref = self.db.collection('sessions').document(session_id) + doc_ref.set(session_data, merge=True) + + except Exception as e: + logger.error(f"Failed to update session: {e}") + + async def delete_memory(self, memory_id: str) -> bool: + """Delete a specific memory""" + try: + doc_ref = self.db.collection('memories').document(memory_id) + doc_ref.delete() + return True + + except Exception as e: + logger.error(f"Failed to delete memory: {e}") + return False diff --git a/assist/server/models/schemas.py b/assist/server/models/schemas.py new file mode 100644 index 00000000..83febf15 --- /dev/null +++ b/assist/server/models/schemas.py @@ -0,0 +1,133 @@ +""" +Pydantic schemas for the Messenger AI Assistant API +""" + +from pydantic import BaseModel, Field +from typing import List, Optional, Dict, Any +from datetime import datetime + +class Utterance(BaseModel): + """Schema for a transcribed utterance""" + text: str + timestamp: str + connection_id: str + type: str = "transcript" + user_id: Optional[str] = "default" + confidence: Optional[float] = None + +class Memory(BaseModel): + """Schema for a stored memory""" + content: str + type: str + timestamp: str + connection_id: str + user_id: Optional[str] = "default" + created_at: Optional[str] = None + +class ActionItem(BaseModel): + """Schema for an action item""" + description: str + owner: str + due_hint: Optional[str] = None + status: str = "pending" + timestamp: str + connection_id: str + user_id: Optional[str] = "default" + created_at: Optional[str] = None + +class Person(BaseModel): + """Schema for a person entity""" + name: str + role: Optional[str] = None + context: Optional[str] = None + +class RelationshipEdge(BaseModel): + """Schema for a relationship between people""" + person1: str + person2: str + relationship_type: str + evidence: str + timestamp: str + connection_id: str + user_id: Optional[str] = "default" + created_at: Optional[str] = None + +class ReviveRequest(BaseModel): + """Schema for memory revival request""" + cue: str = Field(..., description="Text cue to search for relevant memories") + limit: Optional[int] = Field(10, description="Maximum number of memories to return") + user_id: Optional[str] = Field("default", description="User ID to filter memories") + +class ReviveResponse(BaseModel): + """Schema for memory revival response""" + cue: str + memories: List[Dict[str, Any]] + recap: str + count: int + +class SessionData(BaseModel): + """Schema for session information""" + session_id: str + start_time: str + utterance_count: int + last_activity: str + status: str = "active" + +class HealthResponse(BaseModel): + """Schema for health check response""" + status: str + services: Dict[str, bool] + +class WebSocketMessage(BaseModel): + """Schema for WebSocket messages""" + type: str + data: Optional[Dict[str, Any]] = None + timestamp: Optional[str] = None + +class AudioChunk(BaseModel): + """Schema for audio chunk data""" + chunk_id: str + audio_data: bytes + timestamp: str + connection_id: str + format: str = "webm" + +class VideoFrame(BaseModel): + """Schema for video frame data""" + frame_id: str + frame_data: bytes + timestamp: str + connection_id: str + format: str = "webm" + +class TranscriptChunk(BaseModel): + """Schema for transcript chunk""" + text: str + timestamp: str + confidence: float + connection_id: str + is_final: bool = False + +class SummaryData(BaseModel): + """Schema for conversation summary""" + summary: str + key_points: List[str] + participants: List[str] + duration: Optional[str] = None + timestamp: str + connection_id: str + +class EmbeddingData(BaseModel): + """Schema for embedding data""" + text: str + embedding: List[float] + model: str + timestamp: str + connection_id: str + +class SearchResult(BaseModel): + """Schema for search results""" + query: str + results: List[Dict[str, Any]] + total_count: int + search_time: float diff --git a/assist/server/prompts/actions.md b/assist/server/prompts/actions.md new file mode 100644 index 00000000..6b08f331 --- /dev/null +++ b/assist/server/prompts/actions.md @@ -0,0 +1,43 @@ +# Action Item Extraction Prompt + +You are an AI assistant that identifies and extracts action items from conversations. Your task is to analyze conversation transcripts and identify specific tasks, commitments, or follow-ups that need to be completed. + +## Instructions: + +1. **Identify Action Items**: Look for explicit tasks, commitments, or follow-ups +2. **Assign Ownership**: Determine who is responsible for each action +3. **Extract Due Dates**: Note any mentioned deadlines or timeframes +4. **Categorize Priority**: Assess the urgency or importance of each action +5. **Preserve Context**: Maintain relevant context for each action item + +## Output Format: + +For each action item, provide: +- **Description**: Clear, actionable description of what needs to be done +- **Owner**: Person responsible for the action +- **Due Date**: Any mentioned deadline or timeframe +- **Priority**: High, Medium, or Low +- **Context**: Relevant background information + +## Example: + +**Action Item 1**: +- **Description**: Update project timeline with new deadlines +- **Owner**: John +- **Due Date**: By end of week +- **Priority**: High +- **Context**: Q4 project extension discussion + +**Action Item 2**: +- **Description**: Schedule client meeting to discuss timeline changes +- **Owner**: Sarah +- **Due Date**: Next Monday +- **Priority**: Medium +- **Context**: Client communication about project updates + +**Action Item 3**: +- **Description**: Prepare resource allocation report +- **Owner**: Mike +- **Due Date**: Before next team meeting +- **Priority**: Medium +- **Context**: Resource reallocation planning diff --git a/assist/server/prompts/relationships.md b/assist/server/prompts/relationships.md new file mode 100644 index 00000000..92d51b64 --- /dev/null +++ b/assist/server/prompts/relationships.md @@ -0,0 +1,47 @@ +# Relationship Mining Prompt + +You are an AI assistant that identifies and extracts relationships between people mentioned in conversations. Your task is to analyze conversation transcripts and identify connections, roles, and interactions between individuals. + +## Instructions: + +1. **Identify People**: Extract names and references to individuals +2. **Determine Relationships**: Identify how people are connected +3. **Extract Context**: Note the context in which relationships are mentioned +4. **Assess Strength**: Determine the strength or closeness of relationships +5. **Note Roles**: Identify professional or personal roles + +## Output Format: + +For each relationship, provide: +- **Person 1**: First person in the relationship +- **Person 2**: Second person in the relationship +- **Relationship Type**: Nature of the relationship (colleague, friend, family, etc.) +- **Context**: How the relationship was mentioned +- **Strength**: Strong, Medium, or Weak +- **Evidence**: Specific quotes or references that indicate the relationship + +## Example: + +**Relationship 1**: +- **Person 1**: John +- **Person 2**: Sarah +- **Relationship Type**: Colleague +- **Context**: Working together on Q4 project +- **Strength**: Strong +- **Evidence**: "Sarah and I have been collaborating on this project for months" + +**Relationship 2**: +- **Person 1**: Mike +- **Person 2**: Client +- **Relationship Type**: Professional +- **Context**: Client communication and project updates +- **Strength**: Medium +- **Evidence**: "The client mentioned they're happy with our progress" + +**Relationship 3**: +- **Person 1**: John +- **Person 2**: Team +- **Relationship Type**: Manager +- **Context**: Leading team meetings and project coordination +- **Strength**: Strong +- **Evidence**: "As the project manager, I need to ensure everyone is aligned" diff --git a/assist/server/prompts/revive.md b/assist/server/prompts/revive.md new file mode 100644 index 00000000..a59758da --- /dev/null +++ b/assist/server/prompts/revive.md @@ -0,0 +1,43 @@ +# Memory Revival Prompt + +You are an AI assistant that helps users retrieve and understand their past conversations and memories. Your task is to analyze a collection of memories and create a coherent, contextual recap based on a user's query. + +## Instructions: + +1. **Understand the Query**: Analyze what the user is looking for +2. **Filter Relevant Memories**: Select memories that relate to the query +3. **Create Coherent Narrative**: Weave memories into a logical story +4. **Provide Context**: Explain the significance and connections +5. **Highlight Key Information**: Emphasize the most important details + +## Output Format: + +- **Query**: The user's original question or cue +- **Relevant Memories**: List of memories that relate to the query +- **Narrative**: A coherent story that connects the memories +- **Key Insights**: Important takeaways or patterns +- **Related Topics**: Other topics that might be relevant + +## Example: + +**Query**: "What did we discuss about the Q4 project?" + +**Relevant Memories**: +- Meeting on October 15th about project timeline +- Discussion about resource allocation challenges +- Decision to extend deadline by 2 weeks +- Client feedback about project scope + +**Narrative**: +The Q4 project has been a significant focus of recent discussions. In October, the team met to review the project timeline and identified several challenges with resource allocation. The main issue was that the original deadline was too aggressive given the available team members. After discussing the constraints with the client, the team decided to extend the deadline by 2 weeks to ensure quality delivery. The client was understanding and appreciated the transparency about the timeline adjustments. + +**Key Insights**: +- Project timeline was initially too aggressive +- Resource allocation was the main constraint +- Client relationship is strong and collaborative +- Team prioritizes quality over speed + +**Related Topics**: +- Resource planning for future projects +- Client communication strategies +- Timeline estimation best practices diff --git a/assist/server/prompts/summarize.md b/assist/server/prompts/summarize.md new file mode 100644 index 00000000..451dbb7e --- /dev/null +++ b/assist/server/prompts/summarize.md @@ -0,0 +1,44 @@ +# Conversation Summarization Prompt + +You are an AI assistant that creates concise, informative summaries of conversations. Your task is to analyze the provided conversation transcript and generate a summary that captures the key points, decisions, and outcomes. + +## Instructions: + +1. **Identify Key Topics**: Extract the main subjects discussed +2. **Capture Decisions**: Note any decisions made or conclusions reached +3. **Highlight Action Items**: Identify tasks or follow-ups mentioned +4. **Note Participants**: List who was involved in the conversation +5. **Preserve Context**: Maintain important context and background information + +## Output Format: + +- **Summary**: 2-3 sentences capturing the essence of the conversation +- **Key Points**: Bullet list of important topics discussed +- **Decisions**: Any decisions or conclusions reached +- **Action Items**: Tasks or follow-ups identified +- **Participants**: People involved in the conversation + +## Example: + +**Summary**: The team discussed the Q4 project timeline and resource allocation, with a decision to extend the deadline by two weeks. + +**Key Points**: +- Q4 project timeline review +- Resource allocation challenges +- Budget constraints discussion +- Client feedback integration + +**Decisions**: +- Extend project deadline by 2 weeks +- Reallocate 2 developers from other projects +- Schedule weekly check-ins + +**Action Items**: +- John to update project timeline +- Sarah to communicate with client +- Team to meet again next Friday + +**Participants**: +- John (Project Manager) +- Sarah (Client Relations) +- Mike (Development Lead) diff --git a/assist/server/requirements.txt b/assist/server/requirements.txt new file mode 100644 index 00000000..d31c9782 --- /dev/null +++ b/assist/server/requirements.txt @@ -0,0 +1,12 @@ +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +websockets==12.0 +google-generativeai==0.3.2 +google-cloud-aiplatform==1.38.1 +google-cloud-firestore==2.13.1 +python-dotenv==1.0.0 +pydantic==2.5.0 +numpy==1.24.3 +sentence-transformers==2.2.2 +aiofiles==23.2.1 +python-multipart==0.0.6 diff --git a/assist/server/ws_ingest.py b/assist/server/ws_ingest.py new file mode 100644 index 00000000..4207ebcf --- /dev/null +++ b/assist/server/ws_ingest.py @@ -0,0 +1,123 @@ +""" +WebSocket ingest handler for receiving audio/video chunks from Chrome extension +""" + +import asyncio +import logging +import base64 +from typing import Optional +from fastapi import WebSocket +import numpy as np +from io import BytesIO + +logger = logging.getLogger(__name__) + +class WebSocketIngest: + def __init__(self, gemini_live, adk_orchestrator): + self.gemini_live = gemini_live + self.adk_orchestrator = adk_orchestrator + self.active_connections: Dict[str, WebSocket] = {} + self.frame_counter = 0 + self.frame_interval = 30 # Capture frame every 30 chunks + + async def handle_connection(self, websocket: WebSocket): + """Handle WebSocket connection from Chrome extension""" + connection_id = f"conn_{id(websocket)}" + self.active_connections[connection_id] = websocket + + try: + logger.info(f"WebSocket connection established: {connection_id}") + + while True: + # Receive binary data (WebM chunks) + data = await websocket.receive_bytes() + await self.process_chunk(data, connection_id) + + except Exception as e: + logger.error(f"Error in WebSocket connection {connection_id}: {e}") + finally: + if connection_id in self.active_connections: + del self.active_connections[connection_id] + logger.info(f"WebSocket connection closed: {connection_id}") + + async def process_chunk(self, chunk_data: bytes, connection_id: str): + """Process incoming WebM chunk""" + try: + # Extract audio frames from WebM chunk + audio_frames = await self.extract_audio_frames(chunk_data) + + if audio_frames is not None and len(audio_frames) > 0: + # Send to Gemini Live for transcription + if self.gemini_live: + await self.gemini_live.send_audio_frames(audio_frames) + + # Process with ADK orchestrator + if self.adk_orchestrator: + await self.adk_orchestrator.process_audio_chunk(audio_frames, connection_id) + + # Periodically capture video frames for embeddings + self.frame_counter += 1 + if self.frame_counter % self.frame_interval == 0: + await self.capture_video_frame(chunk_data, connection_id) + + except Exception as e: + logger.error(f"Error processing chunk: {e}") + + async def extract_audio_frames(self, webm_data: bytes) -> Optional[np.ndarray]: + """Extract audio frames from WebM chunk""" + try: + # This is a simplified extraction - in production, you'd use proper WebM parsing + # For now, we'll simulate audio frame extraction + # In a real implementation, you'd use libraries like ffmpeg-python or pymediainfo + + # Simulate audio frame extraction + # In practice, you'd parse the WebM container and extract PCM audio + audio_length = len(webm_data) // 4 # Rough estimate + audio_frames = np.random.randn(audio_length).astype(np.float32) + + return audio_frames + + except Exception as e: + logger.error(f"Failed to extract audio frames: {e}") + return None + + async def capture_video_frame(self, webm_data: bytes, connection_id: str): + """Capture video frame for embedding generation""" + try: + # Extract video frame from WebM chunk + # In production, you'd use proper video frame extraction + frame_data = await self.extract_video_frame(webm_data) + + if frame_data: + # Generate embedding for the frame + if self.adk_orchestrator: + await self.adk_orchestrator.process_video_frame(frame_data, connection_id) + + except Exception as e: + logger.error(f"Failed to capture video frame: {e}") + + async def extract_video_frame(self, webm_data: bytes) -> Optional[bytes]: + """Extract video frame from WebM chunk""" + try: + # Simplified video frame extraction + # In production, you'd use proper video parsing + # For now, return a placeholder + return webm_data[:1024] # First 1KB as placeholder + + except Exception as e: + logger.error(f"Failed to extract video frame: {e}") + return None + + async def send_message(self, connection_id: str, message: dict): + """Send message to specific WebSocket connection""" + if connection_id in self.active_connections: + try: + websocket = self.active_connections[connection_id] + await websocket.send_json(message) + except Exception as e: + logger.error(f"Failed to send message to {connection_id}: {e}") + + async def broadcast_message(self, message: dict): + """Broadcast message to all active connections""" + for connection_id in list(self.active_connections.keys()): + await self.send_message(connection_id, message) From ce142fcd433d1fa4cd73dfeb79452a1fc3970607 Mon Sep 17 00:00:00 2001 From: kgand <34491606+kgand@users.noreply.github.com> Date: Sat, 27 Sep 2025 02:36:43 -0400 Subject: [PATCH 2/4] feat(agents): implement ADK agents with Memory Bank for conversation processing --- assist/server/adk_agents.py | 460 ++++++++++++++++++++++++++++++ assist/server/memory/pipelines.py | 343 ++++++++++++++++++++++ 2 files changed, 803 insertions(+) create mode 100644 assist/server/adk_agents.py create mode 100644 assist/server/memory/pipelines.py diff --git a/assist/server/adk_agents.py b/assist/server/adk_agents.py new file mode 100644 index 00000000..b5244f9b --- /dev/null +++ b/assist/server/adk_agents.py @@ -0,0 +1,460 @@ +""" +ADK (Agent Development Kit) implementation for conversation processing +""" + +import asyncio +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime +import json +import os + +# ADK imports (these would be installed via pip install google-cloud-aiplatform[adk]) +try: + from google.cloud import aiplatform + from google.cloud.aiplatform import adk + from google.cloud.aiplatform.adk import Agent, Task, MemoryBank +except ImportError: + # Fallback for development + aiplatform = None + adk = None + Agent = None + Task = None + MemoryBank = None + +logger = logging.getLogger(__name__) + +class ADKAgent: + """Base ADK Agent class""" + + def __init__(self, name: str, description: str): + self.name = name + self.description = description + self.is_running = False + self.memory_bank = None + + async def initialize(self, memory_bank: 'ADKMemoryBank'): + """Initialize the agent with memory bank""" + self.memory_bank = memory_bank + self.is_running = True + logger.info(f"Agent {self.name} initialized") + + async def process(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Process data and return results""" + raise NotImplementedError("Subclasses must implement process method") + + async def stop(self): + """Stop the agent""" + self.is_running = False + logger.info(f"Agent {self.name} stopped") + +class TranscriberAgent(ADKAgent): + """Agent for transcribing audio to text""" + + def __init__(self): + super().__init__("Transcriber", "Converts audio to text using speech recognition") + self.model = None + + async def initialize(self, memory_bank: 'ADKMemoryBank'): + await super().initialize(memory_bank) + # Initialize speech recognition model + # In production, this would use Google Cloud Speech-to-Text + logger.info("Transcriber agent initialized") + + async def process(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Transcribe audio data to text""" + try: + audio_frames = data.get('audio_frames', []) + if not audio_frames: + return {'transcript': '', 'confidence': 0.0} + + # Simulate transcription (in production, use actual speech recognition) + transcript = await self._simulate_transcription(audio_frames) + confidence = 0.9 # Simulated confidence + + return { + 'transcript': transcript, + 'confidence': confidence, + 'timestamp': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Transcriber agent error: {e}") + return {'transcript': '', 'confidence': 0.0} + + async def _simulate_transcription(self, audio_frames: List[float]) -> str: + """Simulate audio transcription""" + # In production, this would use Google Cloud Speech-to-Text + return "This is a simulated transcription of the audio content." + +class SummarizerAgent(ADKAgent): + """Agent for summarizing conversations""" + + def __init__(self): + super().__init__("Summarizer", "Creates conversation summaries and key points") + self.summarization_model = None + + async def initialize(self, memory_bank: 'ADKMemoryBank'): + await super().initialize(memory_bank) + # Initialize summarization model + # In production, this would use Gemini 2.5 Pro + logger.info("Summarizer agent initialized") + + async def process(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Generate conversation summary""" + try: + utterances = data.get('utterances', []) + if not utterances: + return {'summary': '', 'key_points': []} + + # Generate summary using AI model + summary = await self._generate_summary(utterances) + key_points = await self._extract_key_points(utterances) + + return { + 'summary': summary, + 'key_points': key_points, + 'timestamp': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Summarizer agent error: {e}") + return {'summary': '', 'key_points': []} + + async def _generate_summary(self, utterances: List[Dict]) -> str: + """Generate conversation summary""" + # In production, this would use Gemini 2.5 Pro + return "This is a simulated summary of the conversation." + + async def _extract_key_points(self, utterances: List[Dict]) -> List[str]: + """Extract key points from conversation""" + # In production, this would use AI to extract key points + return ["Key point 1", "Key point 2", "Key point 3"] + +class ActionPlannerAgent(ADKAgent): + """Agent for extracting action items""" + + def __init__(self): + super().__init__("ActionPlanner", "Extracts and plans action items from conversations") + + async def process(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Extract action items from conversation""" + try: + utterances = data.get('utterances', []) + if not utterances: + return {'actions': []} + + # Extract action items using AI + actions = await self._extract_actions(utterances) + + return { + 'actions': actions, + 'timestamp': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Action planner agent error: {e}") + return {'actions': []} + + async def _extract_actions(self, utterances: List[Dict]) -> List[Dict]: + """Extract action items from utterances""" + # In production, this would use AI to extract actions + return [ + { + 'description': 'Follow up on discussed topic', + 'owner': 'user', + 'due_hint': 'next week', + 'priority': 'medium' + } + ] + +class RelationshipMinerAgent(ADKAgent): + """Agent for mining relationships between people""" + + def __init__(self): + super().__init__("RelationshipMiner", "Identifies relationships between people in conversations") + + async def process(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Extract relationships from conversation""" + try: + utterances = data.get('utterances', []) + if not utterances: + return {'relationships': []} + + # Extract relationships using AI + relationships = await self._extract_relationships(utterances) + + return { + 'relationships': relationships, + 'timestamp': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Relationship miner agent error: {e}") + return {'relationships': []} + + async def _extract_relationships(self, utterances: List[Dict]) -> List[Dict]: + """Extract relationships from utterances""" + # In production, this would use AI to extract relationships + return [ + { + 'person1': 'Alice', + 'person2': 'Bob', + 'relationship_type': 'colleague', + 'evidence': 'mentioned working together' + } + ] + +class MemoryWriterAgent(ADKAgent): + """Agent for writing memories to long-term storage""" + + def __init__(self): + super().__init__("MemoryWriter", "Writes processed memories to long-term storage") + + async def process(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Write memories to storage""" + try: + memories = data.get('memories', []) + if not memories: + return {'written': 0} + + # Write memories to storage + written_count = await self._write_memories(memories) + + return { + 'written': written_count, + 'timestamp': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Memory writer agent error: {e}") + return {'written': 0} + + async def _write_memories(self, memories: List[Dict]) -> int: + """Write memories to storage""" + if self.memory_bank: + for memory in memories: + await self.memory_bank.store_memory(memory) + return len(memories) + return 0 + +class ADKMemoryBank: + """ADK Memory Bank for persistent long-term memory""" + + def __init__(self, project_id: str, location: str = "us-central1"): + self.project_id = project_id + self.location = location + self.memory_bank = None + self.is_initialized = False + + async def initialize(self): + """Initialize the memory bank""" + try: + if aiplatform and adk: + # Initialize Vertex AI Memory Bank + aiplatform.init(project=self.project_id, location=self.location) + + # Create memory bank + self.memory_bank = MemoryBank( + project=self.project_id, + location=self.location + ) + + self.is_initialized = True + logger.info("ADK Memory Bank initialized") + else: + logger.warning("ADK not available, using fallback memory store") + self.is_initialized = True + + except Exception as e: + logger.error(f"Failed to initialize ADK Memory Bank: {e}") + raise + + async def store_memory(self, memory: Dict[str, Any]) -> str: + """Store a memory in the memory bank""" + try: + if self.memory_bank and self.is_initialized: + # Store in ADK Memory Bank + memory_id = await self.memory_bank.store_memory(memory) + return memory_id + else: + # Fallback to simple storage + memory_id = f"memory_{datetime.utcnow().timestamp()}" + logger.info(f"Stored memory (fallback): {memory_id}") + return memory_id + + except Exception as e: + logger.error(f"Failed to store memory: {e}") + return "" + + async def retrieve_memories(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """Retrieve memories based on query""" + try: + if self.memory_bank and self.is_initialized: + # Retrieve from ADK Memory Bank + memories = await self.memory_bank.retrieve_memories(query, limit) + return memories + else: + # Fallback to simple retrieval + logger.info(f"Retrieved memories (fallback) for query: {query}") + return [] + + except Exception as e: + logger.error(f"Failed to retrieve memories: {e}") + return [] + + async def update_memory(self, memory_id: str, updates: Dict[str, Any]) -> bool: + """Update an existing memory""" + try: + if self.memory_bank and self.is_initialized: + # Update in ADK Memory Bank + success = await self.memory_bank.update_memory(memory_id, updates) + return success + else: + # Fallback + logger.info(f"Updated memory (fallback): {memory_id}") + return True + + except Exception as e: + logger.error(f"Failed to update memory: {e}") + return False + + async def delete_memory(self, memory_id: str) -> bool: + """Delete a memory""" + try: + if self.memory_bank and self.is_initialized: + # Delete from ADK Memory Bank + success = await self.memory_bank.delete_memory(memory_id) + return success + else: + # Fallback + logger.info(f"Deleted memory (fallback): {memory_id}") + return True + + except Exception as e: + logger.error(f"Failed to delete memory: {e}") + return False + +class ADKOrchestrator: + """ADK Orchestrator for managing agents and memory bank""" + + def __init__(self, project_id: str, location: str = "us-central1"): + self.project_id = project_id + self.location = location + self.memory_bank = None + self.agents = {} + self.is_running = False + self.processing_queue = asyncio.Queue() + + async def initialize(self): + """Initialize the orchestrator""" + try: + # Initialize memory bank + self.memory_bank = ADKMemoryBank(self.project_id, self.location) + await self.memory_bank.initialize() + + # Initialize agents + self.agents = { + 'transcriber': TranscriberAgent(), + 'summarizer': SummarizerAgent(), + 'action_planner': ActionPlannerAgent(), + 'relationship_miner': RelationshipMinerAgent(), + 'memory_writer': MemoryWriterAgent() + } + + # Initialize each agent + for agent in self.agents.values(): + await agent.initialize(self.memory_bank) + + self.is_running = True + logger.info("ADK Orchestrator initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize ADK Orchestrator: {e}") + raise + + async def start(self): + """Start the orchestrator""" + if not self.is_running: + await self.initialize() + + # Start processing loop + asyncio.create_task(self._processing_loop()) + logger.info("ADK Orchestrator started") + + async def stop(self): + """Stop the orchestrator""" + self.is_running = False + + # Stop all agents + for agent in self.agents.values(): + await agent.stop() + + logger.info("ADK Orchestrator stopped") + + async def process_data(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Process data through the agent pipeline""" + try: + # Add to processing queue + await self.processing_queue.put(data) + + # Process through agents + results = {} + + # Transcriber + if 'audio_frames' in data: + transcriber_result = await self.agents['transcriber'].process(data) + results['transcription'] = transcriber_result + + # Summarizer + if 'utterances' in data: + summarizer_result = await self.agents['summarizer'].process(data) + results['summary'] = summarizer_result + + # Action Planner + if 'utterances' in data: + action_result = await self.agents['action_planner'].process(data) + results['actions'] = action_result + + # Relationship Miner + if 'utterances' in data: + relationship_result = await self.agents['relationship_miner'].process(data) + results['relationships'] = relationship_result + + # Memory Writer + if 'memories' in data: + memory_result = await self.agents['memory_writer'].process(data) + results['memory_written'] = memory_result + + return results + + except Exception as e: + logger.error(f"Error processing data: {e}") + return {} + + async def _processing_loop(self): + """Main processing loop""" + while self.is_running: + try: + # Get next item from queue + data = await asyncio.wait_for(self.processing_queue.get(), timeout=1.0) + + # Process through pipeline + await self.process_data(data) + + except asyncio.TimeoutError: + continue + except Exception as e: + logger.error(f"Error in processing loop: {e}") + + async def get_memories(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """Get memories from the memory bank""" + if self.memory_bank: + return await self.memory_bank.retrieve_memories(query, limit) + return [] + + async def store_memory(self, memory: Dict[str, Any]) -> str: + """Store a memory in the memory bank""" + if self.memory_bank: + return await self.memory_bank.store_memory(memory) + return "" diff --git a/assist/server/memory/pipelines.py b/assist/server/memory/pipelines.py new file mode 100644 index 00000000..0f25f987 --- /dev/null +++ b/assist/server/memory/pipelines.py @@ -0,0 +1,343 @@ +""" +Memory processing pipelines for conversation analysis +""" + +import asyncio +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime +import json +import numpy as np +from sentence_transformers import SentenceTransformer + +logger = logging.getLogger(__name__) + +class EmbeddingPipeline: + """Pipeline for generating embeddings from text and images""" + + def __init__(self): + self.text_model = None + self.multimodal_model = None + self.is_initialized = False + + async def initialize(self): + """Initialize embedding models""" + try: + # Initialize text embedding model + self.text_model = SentenceTransformer('all-MiniLM-L6-v2') + + # For multimodal embeddings, we'd use Google's multimodal-embedding-1 + # For now, we'll use a placeholder + self.multimodal_model = None + + self.is_initialized = True + logger.info("Embedding pipeline initialized") + + except Exception as e: + logger.error(f"Failed to initialize embedding pipeline: {e}") + raise + + async def generate_text_embedding(self, text: str) -> List[float]: + """Generate embedding for text""" + try: + if not self.is_initialized: + await self.initialize() + + if self.text_model: + embedding = self.text_model.encode(text) + return embedding.tolist() + else: + # Fallback to random embedding + return np.random.randn(384).tolist() + + except Exception as e: + logger.error(f"Failed to generate text embedding: {e}") + return np.random.randn(384).tolist() + + async def generate_image_embedding(self, image_data: bytes) -> List[float]: + """Generate embedding for image""" + try: + if not self.is_initialized: + await self.initialize() + + # In production, this would use Google's multimodal-embedding-1 + # For now, return a random embedding + return np.random.randn(512).tolist() + + except Exception as e: + logger.error(f"Failed to generate image embedding: {e}") + return np.random.randn(512).tolist() + + async def compute_similarity(self, embedding1: List[float], embedding2: List[float]) -> float: + """Compute cosine similarity between two embeddings""" + try: + # Convert to numpy arrays + vec1 = np.array(embedding1) + vec2 = np.array(embedding2) + + # Compute cosine similarity + dot_product = np.dot(vec1, vec2) + norm1 = np.linalg.norm(vec1) + norm2 = np.linalg.norm(vec2) + + if norm1 == 0 or norm2 == 0: + return 0.0 + + similarity = dot_product / (norm1 * norm2) + return float(similarity) + + except Exception as e: + logger.error(f"Failed to compute similarity: {e}") + return 0.0 + +class MemoryIndexingPipeline: + """Pipeline for indexing memories for efficient retrieval""" + + def __init__(self, embedding_pipeline: EmbeddingPipeline): + self.embedding_pipeline = embedding_pipeline + self.memory_index = {} + self.is_initialized = False + + async def initialize(self): + """Initialize the indexing pipeline""" + try: + if not self.embedding_pipeline.is_initialized: + await self.embedding_pipeline.initialize() + + self.is_initialized = True + logger.info("Memory indexing pipeline initialized") + + except Exception as e: + logger.error(f"Failed to initialize indexing pipeline: {e}") + raise + + async def index_memory(self, memory: Dict[str, Any]) -> str: + """Index a memory for retrieval""" + try: + if not self.is_initialized: + await self.initialize() + + memory_id = memory.get('id', f"memory_{datetime.utcnow().timestamp()}") + text_content = memory.get('content', '') + + # Generate embedding + embedding = await self.embedding_pipeline.generate_text_embedding(text_content) + + # Store in index + self.memory_index[memory_id] = { + 'memory': memory, + 'embedding': embedding, + 'indexed_at': datetime.utcnow().isoformat() + } + + logger.info(f"Indexed memory: {memory_id}") + return memory_id + + except Exception as e: + logger.error(f"Failed to index memory: {e}") + return "" + + async def search_similar_memories(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """Search for similar memories using embedding similarity""" + try: + if not self.is_initialized: + await self.initialize() + + # Generate query embedding + query_embedding = await self.embedding_pipeline.generate_text_embedding(query) + + # Compute similarities + similarities = [] + for memory_id, indexed_memory in self.memory_index.items(): + similarity = await self.embedding_pipeline.compute_similarity( + query_embedding, + indexed_memory['embedding'] + ) + similarities.append({ + 'memory_id': memory_id, + 'memory': indexed_memory['memory'], + 'similarity': similarity + }) + + # Sort by similarity and return top results + similarities.sort(key=lambda x: x['similarity'], reverse=True) + return similarities[:limit] + + except Exception as e: + logger.error(f"Failed to search similar memories: {e}") + return [] + + async def remove_memory_from_index(self, memory_id: str) -> bool: + """Remove a memory from the index""" + try: + if memory_id in self.memory_index: + del self.memory_index[memory_id] + logger.info(f"Removed memory from index: {memory_id}") + return True + return False + + except Exception as e: + logger.error(f"Failed to remove memory from index: {e}") + return False + +class ConversationAnalysisPipeline: + """Pipeline for analyzing conversation patterns and extracting insights""" + + def __init__(self, embedding_pipeline: EmbeddingPipeline): + self.embedding_pipeline = embedding_pipeline + self.is_initialized = False + + async def initialize(self): + """Initialize the analysis pipeline""" + try: + if not self.embedding_pipeline.is_initialized: + await self.embedding_pipeline.initialize() + + self.is_initialized = True + logger.info("Conversation analysis pipeline initialized") + + except Exception as e: + logger.error(f"Failed to initialize analysis pipeline: {e}") + raise + + async def analyze_conversation(self, utterances: List[Dict[str, Any]]) -> Dict[str, Any]: + """Analyze a conversation for patterns and insights""" + try: + if not self.is_initialized: + await self.initialize() + + # Extract key insights + insights = { + 'participants': await self._extract_participants(utterances), + 'topics': await self._extract_topics(utterances), + 'sentiment': await self._analyze_sentiment(utterances), + 'key_phrases': await self._extract_key_phrases(utterances), + 'conversation_flow': await self._analyze_conversation_flow(utterances) + } + + return insights + + except Exception as e: + logger.error(f"Failed to analyze conversation: {e}") + return {} + + async def _extract_participants(self, utterances: List[Dict]) -> List[str]: + """Extract participants from conversation""" + participants = set() + for utterance in utterances: + # In production, this would use NLP to identify speakers + if 'speaker' in utterance: + participants.add(utterance['speaker']) + + return list(participants) + + async def _extract_topics(self, utterances: List[Dict]) -> List[str]: + """Extract main topics from conversation""" + # In production, this would use topic modeling + topics = [] + for utterance in utterances: + text = utterance.get('text', '') + if 'project' in text.lower(): + topics.append('Project Management') + if 'meeting' in text.lower(): + topics.append('Meetings') + if 'deadline' in text.lower(): + topics.append('Deadlines') + + return list(set(topics)) + + async def _analyze_sentiment(self, utterances: List[Dict]) -> Dict[str, float]: + """Analyze sentiment of conversation""" + # In production, this would use sentiment analysis + return { + 'positive': 0.6, + 'neutral': 0.3, + 'negative': 0.1 + } + + async def _extract_key_phrases(self, utterances: List[Dict]) -> List[str]: + """Extract key phrases from conversation""" + # In production, this would use NLP to extract key phrases + key_phrases = [] + for utterance in utterances: + text = utterance.get('text', '') + if len(text) > 10: + key_phrases.append(text[:50] + "...") + + return key_phrases[:5] # Return top 5 + + async def _analyze_conversation_flow(self, utterances: List[Dict]) -> Dict[str, Any]: + """Analyze the flow and structure of conversation""" + return { + 'total_utterances': len(utterances), + 'average_length': sum(len(u.get('text', '')) for u in utterances) / len(utterances) if utterances else 0, + 'turn_taking': 'balanced', # In production, analyze actual turn-taking + 'interruptions': 0 # In production, detect interruptions + } + +class MemoryRetrievalPipeline: + """Pipeline for retrieving and assembling memories""" + + def __init__(self, embedding_pipeline: EmbeddingPipeline, indexing_pipeline: MemoryIndexingPipeline): + self.embedding_pipeline = embedding_pipeline + self.indexing_pipeline = indexing_pipeline + self.is_initialized = False + + async def initialize(self): + """Initialize the retrieval pipeline""" + try: + if not self.embedding_pipeline.is_initialized: + await self.embedding_pipeline.initialize() + if not self.indexing_pipeline.is_initialized: + await self.indexing_pipeline.initialize() + + self.is_initialized = True + logger.info("Memory retrieval pipeline initialized") + + except Exception as e: + logger.error(f"Failed to initialize retrieval pipeline: {e}") + raise + + async def retrieve_memories(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + """Retrieve relevant memories based on query""" + try: + if not self.is_initialized: + await self.initialize() + + # Search for similar memories + similar_memories = await self.indexing_pipeline.search_similar_memories(query, limit) + + # Filter and rank results + relevant_memories = [] + for result in similar_memories: + if result['similarity'] > 0.3: # Threshold for relevance + relevant_memories.append(result['memory']) + + return relevant_memories + + except Exception as e: + logger.error(f"Failed to retrieve memories: {e}") + return [] + + async def assemble_memory_context(self, memories: List[Dict[str, Any]], query: str) -> str: + """Assemble memories into a coherent context""" + try: + if not memories: + return "No relevant memories found." + + # Sort memories by timestamp + sorted_memories = sorted(memories, key=lambda x: x.get('timestamp', '')) + + # Create context + context_parts = [f"Query: {query}\n\nRelevant Memories:"] + + for i, memory in enumerate(sorted_memories, 1): + content = memory.get('content', '') + timestamp = memory.get('timestamp', 'Unknown') + context_parts.append(f"{i}. {content} (Timestamp: {timestamp})") + + return "\n".join(context_parts) + + except Exception as e: + logger.error(f"Failed to assemble memory context: {e}") + return "Error assembling memory context" From bda981bffdeeca5ce2464391722108963d8865e0 Mon Sep 17 00:00:00 2001 From: kgand <34491606+kgand@users.noreply.github.com> Date: Sat, 27 Sep 2025 02:42:26 -0400 Subject: [PATCH 3/4] feat(revive): implement /revive API with Firestore and embedding-based memory retrieval --- assist/server/app.py | 74 ++++++-- assist/server/revive_api.py | 354 ++++++++++++++++++++++++++++++++++++ 2 files changed, 408 insertions(+), 20 deletions(-) create mode 100644 assist/server/revive_api.py diff --git a/assist/server/app.py b/assist/server/app.py index 103add43..63fdf9c7 100644 --- a/assist/server/app.py +++ b/assist/server/app.py @@ -16,6 +16,7 @@ from gemini_live import GeminiLiveClient from adk_orchestrator import ADKOrchestrator from memory.store_firestore import FirestoreMemoryStore +from revive_api import ReviveAPI from models.schemas import ReviveRequest, ReviveResponse # Load environment variables @@ -46,11 +47,12 @@ gemini_live = None adk_orchestrator = None memory_store = None +revive_api = None @app.on_event("startup") async def startup_event(): """Initialize services on startup""" - global websocket_ingest, gemini_live, adk_orchestrator, memory_store + global websocket_ingest, gemini_live, adk_orchestrator, memory_store, revive_api try: # Initialize memory store @@ -66,6 +68,10 @@ async def startup_event(): # Initialize WebSocket ingest websocket_ingest = WebSocketIngest(gemini_live, adk_orchestrator) + # Initialize Revive API + revive_api = ReviveAPI(memory_store) + await revive_api.initialize() + logger.info("All services initialized successfully") except Exception as e: @@ -81,7 +87,8 @@ async def health_check(): "memory_store": memory_store is not None, "gemini_live": gemini_live is not None, "adk_orchestrator": adk_orchestrator is not None, - "websocket_ingest": websocket_ingest is not None + "websocket_ingest": websocket_ingest is not None, + "revive_api": revive_api is not None } } @@ -106,26 +113,21 @@ async def websocket_endpoint(websocket: WebSocket): async def revive_memories(request: ReviveRequest): """Retrieve and assemble memories based on a text cue""" try: - if not memory_store: - raise HTTPException(status_code=503, detail="Memory store not initialized") + if not revive_api: + raise HTTPException(status_code=503, detail="Revive API not initialized") - # Search for relevant memories - memories = await memory_store.search_memories( - query=request.cue, + # Use the Revive API to process the request + result = await revive_api.revive_memories( + cue=request.cue, + user_id=request.user_id or "default", limit=request.limit or 10 ) - # Generate stitched recap using Gemini - if gemini_live: - recap = await gemini_live.generate_recap(memories, request.cue) - else: - recap = "Memory service unavailable" - return ReviveResponse( - cue=request.cue, - memories=memories, - recap=recap, - count=len(memories) + cue=result['cue'], + memories=result['memories'], + recap=result['recap'], + count=result['count'] ) except Exception as e: @@ -150,10 +152,10 @@ async def get_user_memories(user_id: str, limit: int = 50): async def delete_memory(memory_id: str): """Delete a specific memory""" try: - if not memory_store: - raise HTTPException(status_code=503, detail="Memory store not initialized") + if not revive_api: + raise HTTPException(status_code=503, detail="Revive API not initialized") - success = await memory_store.delete_memory(memory_id) + success = await revive_api.delete_memory(memory_id) if success: return {"message": "Memory deleted successfully"} else: @@ -163,6 +165,38 @@ async def delete_memory(memory_id: str): logger.error(f"Failed to delete memory: {e}") raise HTTPException(status_code=500, detail=str(e)) +@app.get("/memories/{user_id}/statistics") +async def get_memory_statistics(user_id: str): + """Get memory statistics for a user""" + try: + if not revive_api: + raise HTTPException(status_code=503, detail="Revive API not initialized") + + statistics = await revive_api.get_memory_statistics(user_id) + return statistics + + except Exception as e: + logger.error(f"Failed to get memory statistics: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/memories/{user_id}/search") +async def search_memories(user_id: str, query: str, limit: int = 20): + """Search memories with advanced filtering""" + try: + if not revive_api: + raise HTTPException(status_code=503, detail="Revive API not initialized") + + results = await revive_api.search_memories(query, user_id, limit) + return { + "query": query, + "results": results, + "count": len(results) + } + + except Exception as e: + logger.error(f"Failed to search memories: {e}") + raise HTTPException(status_code=500, detail=str(e)) + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/assist/server/revive_api.py b/assist/server/revive_api.py new file mode 100644 index 00000000..030ca436 --- /dev/null +++ b/assist/server/revive_api.py @@ -0,0 +1,354 @@ +""" +Revive API implementation for memory retrieval and assembly +""" + +import asyncio +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime +import json +import numpy as np +from fastapi import HTTPException + +from memory.store_firestore import FirestoreMemoryStore +from memory.pipelines import EmbeddingPipeline, MemoryIndexingPipeline, MemoryRetrievalPipeline +from gemini_live import GeminiLiveClient + +logger = logging.getLogger(__name__) + +class ReviveAPI: + """API for retrieving and assembling memories""" + + def __init__(self, memory_store: FirestoreMemoryStore): + self.memory_store = memory_store + self.embedding_pipeline = None + self.indexing_pipeline = None + self.retrieval_pipeline = None + self.gemini_client = None + self.is_initialized = False + + async def initialize(self): + """Initialize the revive API""" + try: + # Initialize embedding pipeline + self.embedding_pipeline = EmbeddingPipeline() + await self.embedding_pipeline.initialize() + + # Initialize indexing pipeline + self.indexing_pipeline = MemoryIndexingPipeline(self.embedding_pipeline) + await self.indexing_pipeline.initialize() + + # Initialize retrieval pipeline + self.retrieval_pipeline = MemoryRetrievalPipeline( + self.embedding_pipeline, + self.indexing_pipeline + ) + await self.retrieval_pipeline.initialize() + + # Initialize Gemini client for recap generation + self.gemini_client = GeminiLiveClient() + + self.is_initialized = True + logger.info("Revive API initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize Revive API: {e}") + raise + + async def revive_memories(self, cue: str, user_id: str = "default", limit: int = 10) -> Dict[str, Any]: + """Retrieve and assemble memories based on a text cue""" + try: + if not self.is_initialized: + await self.initialize() + + # Step 1: Search for relevant memories using embeddings + relevant_memories = await self._search_memories_by_embedding(cue, user_id, limit) + + # Step 2: Get additional memories from Firestore + firestore_memories = await self.memory_store.search_memories(cue, limit) + + # Step 3: Combine and deduplicate memories + all_memories = self._combine_memories(relevant_memories, firestore_memories) + + # Step 4: Generate stitched recap using Gemini + recap = await self._generate_recap(all_memories, cue) + + # Step 5: Extract key insights + insights = await self._extract_insights(all_memories, cue) + + return { + 'cue': cue, + 'memories': all_memories, + 'recap': recap, + 'insights': insights, + 'count': len(all_memories), + 'timestamp': datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Failed to revive memories: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + async def _search_memories_by_embedding(self, cue: str, user_id: str, limit: int) -> List[Dict[str, Any]]: + """Search memories using embedding similarity""" + try: + # Get all memories for the user + all_memories = await self.memory_store.get_user_memories(user_id, 100) + + if not all_memories: + return [] + + # Index memories if not already indexed + for memory in all_memories: + if memory.get('id') not in self.indexing_pipeline.memory_index: + await self.indexing_pipeline.index_memory(memory) + + # Search for similar memories + similar_memories = await self.indexing_pipeline.search_similar_memories(cue, limit) + + return [result['memory'] for result in similar_memories] + + except Exception as e: + logger.error(f"Failed to search memories by embedding: {e}") + return [] + + def _combine_memories(self, embedding_memories: List[Dict], firestore_memories: List[Dict]) -> List[Dict[str, Any]]: + """Combine and deduplicate memories from different sources""" + try: + # Create a set of memory IDs to avoid duplicates + seen_ids = set() + combined_memories = [] + + # Add embedding-based memories first (higher relevance) + for memory in embedding_memories: + memory_id = memory.get('id', '') + if memory_id and memory_id not in seen_ids: + combined_memories.append(memory) + seen_ids.add(memory_id) + + # Add Firestore memories that aren't already included + for memory in firestore_memories: + memory_id = memory.get('id', '') + if memory_id and memory_id not in seen_ids: + combined_memories.append(memory) + seen_ids.add(memory_id) + + # Sort by timestamp (most recent first) + combined_memories.sort( + key=lambda x: x.get('timestamp', ''), + reverse=True + ) + + return combined_memories + + except Exception as e: + logger.error(f"Failed to combine memories: {e}") + return [] + + async def _generate_recap(self, memories: List[Dict[str, Any]], cue: str) -> str: + """Generate a stitched recap using Gemini""" + try: + if not memories: + return "No relevant memories found for the given cue." + + if not self.gemini_client: + return "Memory service unavailable." + + # Prepare context for recap generation + context = self._prepare_memory_context(memories, cue) + + # Generate recap using Gemini + recap = await self.gemini_client.generate_recap(memories, cue) + + return recap + + except Exception as e: + logger.error(f"Failed to generate recap: {e}") + return f"Error generating recap: {str(e)}" + + def _prepare_memory_context(self, memories: List[Dict[str, Any]], cue: str) -> str: + """Prepare memory context for recap generation""" + try: + context_parts = [ + f"User Query: {cue}", + "", + "Relevant Memories:", + "" + ] + + for i, memory in enumerate(memories, 1): + content = memory.get('content', memory.get('text', '')) + timestamp = memory.get('timestamp', 'Unknown') + memory_type = memory.get('type', 'memory') + + context_parts.append(f"{i}. [{memory_type}] {content}") + context_parts.append(f" Timestamp: {timestamp}") + context_parts.append("") + + return "\n".join(context_parts) + + except Exception as e: + logger.error(f"Failed to prepare memory context: {e}") + return f"Error preparing context: {str(e)}" + + async def _extract_insights(self, memories: List[Dict[str, Any]], cue: str) -> Dict[str, Any]: + """Extract key insights from memories""" + try: + insights = { + 'key_topics': [], + 'participants': [], + 'time_range': None, + 'memory_types': [], + 'relevance_scores': [] + } + + if not memories: + return insights + + # Extract key topics + topics = set() + for memory in memories: + content = memory.get('content', memory.get('text', '')) + if 'project' in content.lower(): + topics.add('Project Management') + if 'meeting' in content.lower(): + topics.add('Meetings') + if 'deadline' in content.lower(): + topics.add('Deadlines') + + insights['key_topics'] = list(topics) + + # Extract participants + participants = set() + for memory in memories: + if 'participants' in memory: + participants.update(memory['participants']) + if 'speaker' in memory: + participants.add(memory['speaker']) + + insights['participants'] = list(participants) + + # Extract time range + timestamps = [memory.get('timestamp', '') for memory in memories if memory.get('timestamp')] + if timestamps: + timestamps.sort() + insights['time_range'] = { + 'earliest': timestamps[0], + 'latest': timestamps[-1] + } + + # Extract memory types + memory_types = set() + for memory in memories: + memory_type = memory.get('type', 'unknown') + memory_types.add(memory_type) + + insights['memory_types'] = list(memory_types) + + # Calculate relevance scores (simplified) + for memory in memories: + # In production, this would use actual relevance scoring + relevance_score = 0.8 # Placeholder + insights['relevance_scores'].append({ + 'memory_id': memory.get('id', ''), + 'score': relevance_score + }) + + return insights + + except Exception as e: + logger.error(f"Failed to extract insights: {e}") + return {} + + async def get_memory_statistics(self, user_id: str = "default") -> Dict[str, Any]: + """Get statistics about user's memories""" + try: + # Get all memories for the user + all_memories = await self.memory_store.get_user_memories(user_id, 1000) + + if not all_memories: + return { + 'total_memories': 0, + 'memory_types': {}, + 'time_range': None, + 'recent_activity': [] + } + + # Calculate statistics + total_memories = len(all_memories) + + # Count memory types + memory_types = {} + for memory in all_memories: + memory_type = memory.get('type', 'unknown') + memory_types[memory_type] = memory_types.get(memory_type, 0) + 1 + + # Get time range + timestamps = [memory.get('timestamp', '') for memory in all_memories if memory.get('timestamp')] + time_range = None + if timestamps: + timestamps.sort() + time_range = { + 'earliest': timestamps[0], + 'latest': timestamps[-1] + } + + # Get recent activity (last 10 memories) + recent_memories = sorted(all_memories, key=lambda x: x.get('timestamp', ''), reverse=True)[:10] + recent_activity = [ + { + 'id': memory.get('id', ''), + 'type': memory.get('type', ''), + 'timestamp': memory.get('timestamp', ''), + 'preview': memory.get('content', memory.get('text', ''))[:100] + } + for memory in recent_memories + ] + + return { + 'total_memories': total_memories, + 'memory_types': memory_types, + 'time_range': time_range, + 'recent_activity': recent_activity + } + + except Exception as e: + logger.error(f"Failed to get memory statistics: {e}") + return {} + + async def search_memories(self, query: str, user_id: str = "default", limit: int = 20) -> List[Dict[str, Any]]: + """Search memories with advanced filtering""" + try: + if not self.is_initialized: + await self.initialize() + + # Search using embedding similarity + embedding_results = await self._search_memories_by_embedding(query, user_id, limit) + + # Search using Firestore text search + firestore_results = await self.memory_store.search_memories(query, limit) + + # Combine and rank results + combined_results = self._combine_memories(embedding_results, firestore_results) + + return combined_results[:limit] + + except Exception as e: + logger.error(f"Failed to search memories: {e}") + return [] + + async def delete_memory(self, memory_id: str) -> bool: + """Delete a specific memory""" + try: + # Delete from Firestore + success = await self.memory_store.delete_memory(memory_id) + + # Remove from indexing pipeline + if success: + await self.indexing_pipeline.remove_memory_from_index(memory_id) + + return success + + except Exception as e: + logger.error(f"Failed to delete memory: {e}") + return False From 5a3ae3e18ceb7c008b59f12d3f014d2586fbc673 Mon Sep 17 00:00:00 2001 From: kgand <34491606+kgand@users.noreply.github.com> Date: Sat, 27 Sep 2025 02:45:19 -0400 Subject: [PATCH 4/4] feat(infra): create setup scripts, Makefile, and Docker configuration for easy deployment --- Makefile | 133 ++++++++++++++++ assist/.env.example | 0 assist/docs/README.md | 257 +++++++++++++++++++++++++++++++ assist/infra/setup.ps1 | 109 +++++++++++++ assist/infra/setup.sh | 103 +++++++++++++ assist/server/Dockerfile | 32 ++++ assist/server/docker-compose.yml | 35 +++++ 7 files changed, 669 insertions(+) create mode 100644 Makefile create mode 100644 assist/.env.example create mode 100644 assist/docs/README.md create mode 100644 assist/infra/setup.ps1 create mode 100644 assist/infra/setup.sh create mode 100644 assist/server/Dockerfile create mode 100644 assist/server/docker-compose.yml diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..2a435e59 --- /dev/null +++ b/Makefile @@ -0,0 +1,133 @@ +# Makefile for Messenger AI Assistant + +.PHONY: help dev chrome-build install clean test lint format + +# Default target +help: + @echo "Messenger AI Assistant - Available Commands:" + @echo "" + @echo " make dev - Start development server" + @echo " make chrome-build - Build Chrome extension" + @echo " make install - Install all dependencies" + @echo " make clean - Clean build artifacts" + @echo " make test - Run tests" + @echo " make lint - Run linters" + @echo " make format - Format code" + @echo "" + +# Development server +dev: + @echo "๐Ÿš€ Starting development server..." + cd assist/server && uvicorn app:app --host 0.0.0.0 --port 8000 --reload + +# Build Chrome extension +chrome-build: + @echo "๐Ÿ”จ Building Chrome extension..." + cd assist/chrome-ext && npm install && npm run build + +# Install all dependencies +install: + @echo "๐Ÿ“ฆ Installing dependencies..." + cd assist/server && pip install -r requirements.txt + cd assist/chrome-ext && npm install + +# Clean build artifacts +clean: + @echo "๐Ÿงน Cleaning build artifacts..." + rm -rf assist/chrome-ext/dist + rm -rf assist/chrome-ext/node_modules + rm -rf __pycache__ + rm -rf .pytest_cache + rm -rf logs/*.log + +# Run tests +test: + @echo "๐Ÿงช Running tests..." + cd assist/server && python -m pytest tests/ -v + +# Run linters +lint: + @echo "๐Ÿ” Running linters..." + cd assist/server && python -m flake8 . --max-line-length=100 + cd assist/chrome-ext && npm run lint + +# Format code +format: + @echo "โœจ Formatting code..." + cd assist/server && python -m black . --line-length=100 + cd assist/chrome-ext && npm run format + +# Setup development environment +setup: + @echo "๐Ÿ”ง Setting up development environment..." + @if [ -f "assist/infra/setup.sh" ]; then \ + chmod +x assist/infra/setup.sh && \ + ./assist/infra/setup.sh; \ + elif [ -f "assist/infra/setup.ps1" ]; then \ + powershell -ExecutionPolicy Bypass -File assist/infra/setup.ps1; \ + else \ + echo "โŒ Setup script not found"; \ + fi + +# Production build +build: chrome-build + @echo "๐Ÿ—๏ธ Building for production..." + cd assist/server && pip install -r requirements.txt + +# Docker build +docker-build: + @echo "๐Ÿณ Building Docker image..." + docker build -t messenger-ai-assistant . + +# Docker run +docker-run: + @echo "๐Ÿณ Running Docker container..." + docker run -p 8000:8000 -p 8765:8765 messenger-ai-assistant + +# Check system requirements +check: + @echo "๐Ÿ” Checking system requirements..." + @echo "Node.js version: $$(node --version 2>/dev/null || echo 'Not installed')" + @echo "Python version: $$(python3 --version 2>/dev/null || echo 'Not installed')" + @echo "Google Cloud CLI: $$(gcloud --version 2>/dev/null | head -1 || echo 'Not installed')" + +# Generate documentation +docs: + @echo "๐Ÿ“š Generating documentation..." + cd assist/server && python -m pydoc -w app + cd assist/server && python -m pydoc -w gemini_live + cd assist/server && python -m pydoc -w revive_api + +# Backup data +backup: + @echo "๐Ÿ’พ Backing up data..." + mkdir -p backups + tar -czf backups/backup-$$(date +%Y%m%d-%H%M%S).tar.gz data/ logs/ + +# Restore data +restore: + @echo "๐Ÿ“ฅ Restoring data..." + @if [ -z "$(BACKUP_FILE)" ]; then \ + echo "โŒ Please specify BACKUP_FILE=path/to/backup.tar.gz"; \ + exit 1; \ + fi + tar -xzf $(BACKUP_FILE) -C . + +# Monitor logs +logs: + @echo "๐Ÿ“Š Monitoring logs..." + tail -f logs/*.log + +# Health check +health: + @echo "๐Ÿฅ Checking system health..." + curl -f http://localhost:8000/health || echo "โŒ Server not responding" + +# Deploy to production +deploy: + @echo "๐Ÿš€ Deploying to production..." + @echo "โš ๏ธ This is a placeholder - implement your deployment strategy" + @echo "Consider using:" + @echo " - Google Cloud Run for the backend" + @echo " - Chrome Web Store for the extension" + @echo " - Cloud Firestore for data storage" diff --git a/assist/.env.example b/assist/.env.example new file mode 100644 index 00000000..e69de29b diff --git a/assist/docs/README.md b/assist/docs/README.md new file mode 100644 index 00000000..58f9022b --- /dev/null +++ b/assist/docs/README.md @@ -0,0 +1,257 @@ +# Messenger AI Assistant + +A production-ready system that captures Messenger Web A/V from a Chrome Extension, streams it to a Python backend, and uses Google Gemini + ADK + Vertex AI Memory Bank to conduct & summarize conversations, extract action items, learn relationships, and persist memories. + +## Architecture + +``` +Chrome Extension (MV3) โ†’ FastAPI (Python) โ†’ Google (Gemini + ADK + Firestore) +``` + +## Features + +- **Chrome Extension (MV3)**: Side Panel UI, Offscreen document, tabCapture for A/V +- **FastAPI Backend**: WebSocket ingest, Gemini Live integration +- **ADK Agents**: Conversation processing with Memory Bank +- **Memory System**: Firestore storage with embedding-based retrieval +- **Revive API**: Intelligent memory recall and assembly + +## Quick Start + +### Prerequisites + +- Node.js 20+ +- Python 3.11+ +- Google Cloud CLI +- Google Cloud Project with Vertex AI and Firestore enabled + +### Installation + +1. **Clone the repository** + ```bash + git clone + cd shellhacks25 + ``` + +2. **Run setup script** + ```bash + # Linux/macOS + chmod +x assist/infra/setup.sh + ./assist/infra/setup.sh + + # Windows + powershell -ExecutionPolicy Bypass -File assist/infra/setup.ps1 + ``` + +3. **Configure environment** + ```bash + cp assist/.env.example .env + # Edit .env with your Google Cloud project details + ``` + +4. **Start development server** + ```bash + make dev + ``` + +5. **Load Chrome extension** + - Open Chrome and go to `chrome://extensions/` + - Enable "Developer mode" + - Click "Load unpacked" and select `assist/chrome-ext/dist` + +## Usage + +### Chrome Extension + +1. Navigate to Messenger Web (messenger.com) +2. Click the extension icon to open the side panel +3. Toggle "I consent to recording my conversations" +4. Click "Start Capture" (requires user gesture) +5. The extension will begin recording and streaming to the backend + +### Backend API + +- **Health Check**: `GET /health` +- **WebSocket Ingest**: `WS /ingest` +- **Revive Memories**: `POST /revive` +- **Memory Statistics**: `GET /memories/{user_id}/statistics` +- **Search Memories**: `GET /memories/{user_id}/search` + +### Example API Usage + +```bash +# Check health +curl http://localhost:8000/health + +# Revive memories +curl -X POST http://localhost:8000/revive \ + -H "Content-Type: application/json" \ + -d '{"cue": "What did we discuss about the project?", "limit": 10}' + +# Get memory statistics +curl http://localhost:8000/memories/default/statistics +``` + +## Development + +### Available Commands + +```bash +make dev # Start development server +make chrome-build # Build Chrome extension +make install # Install all dependencies +make clean # Clean build artifacts +make test # Run tests +make lint # Run linters +make format # Format code +``` + +### Project Structure + +``` +assist/ +โ”œโ”€โ”€ chrome-ext/ # Chrome Extension (MV3) +โ”‚ โ”œโ”€โ”€ manifest.json +โ”‚ โ”œโ”€โ”€ sw.js +โ”‚ โ”œโ”€โ”€ ui/ +โ”‚ โ”œโ”€โ”€ offscreen.html +โ”‚ โ””โ”€โ”€ package.json +โ”œโ”€โ”€ server/ # FastAPI Backend +โ”‚ โ”œโ”€โ”€ app.py +โ”‚ โ”œโ”€โ”€ ws_ingest.py +โ”‚ โ”œโ”€โ”€ gemini_live.py +โ”‚ โ”œโ”€โ”€ adk_agents.py +โ”‚ โ”œโ”€โ”€ revive_api.py +โ”‚ โ”œโ”€โ”€ memory/ +โ”‚ โ””โ”€โ”€ requirements.txt +โ”œโ”€โ”€ docs/ # Documentation +โ”œโ”€โ”€ infra/ # Setup scripts +โ””โ”€โ”€ Makefile +``` + +## Configuration + +### Environment Variables + +Create a `.env` file with the following variables: + +```env +# Google Cloud Configuration +GOOGLE_PROJECT_ID=your-project-id +GOOGLE_APPLICATION_CREDENTIALS=path/to/service-account-key.json +VERTEX_LOCATION=us-central1 + +# Gemini API Configuration +GEMINI_API_KEY=your-gemini-api-key +GEMINI_MODEL_LIVE=gemini-2.0-flash-live-001 +GEMINI_MODEL_REASONING=gemini-2.5-pro + +# Firebase Configuration +FIREBASE_PROJECT_ID=your-firebase-project-id +``` + +### Google Cloud Setup + +1. **Create a Google Cloud Project** +2. **Enable required APIs**: + - Vertex AI API + - Firestore API + - Generative Language API +3. **Create a service account** with required permissions +4. **Download service account key** and set `GOOGLE_APPLICATION_CREDENTIALS` +5. **Initialize Firestore** in your project + +## API Reference + +### WebSocket Endpoint + +**Endpoint**: `WS /ingest` + +Receives WebM audio/video chunks from Chrome extension. + +**Message Format**: +```json +{ + "type": "audio_chunk", + "data": "base64_encoded_webm_data", + "timestamp": "2024-01-01T00:00:00Z", + "connection_id": "conn_123" +} +``` + +### Revive API + +**Endpoint**: `POST /revive` + +Retrieve and assemble memories based on a text cue. + +**Request**: +```json +{ + "cue": "What did we discuss about the project?", + "limit": 10, + "user_id": "default" +} +``` + +**Response**: +```json +{ + "cue": "What did we discuss about the project?", + "memories": [...], + "recap": "Based on your conversations...", + "count": 5 +} +``` + +## Troubleshooting + +### Common Issues + +1. **"Backend not connected"** + - Ensure the FastAPI server is running on port 8000 + - Check that WebSocket endpoint is accessible + +2. **"Please navigate to Messenger Web first"** + - Ensure you're on messenger.com + - Refresh the page and try again + +3. **"Memory store not initialized"** + - Check Google Cloud credentials + - Verify Firestore is enabled + - Check environment variables + +4. **Chrome extension not loading** + - Ensure you're using Chrome (not other browsers) + - Check that the extension is built (`make chrome-build`) + - Verify manifest.json is valid + +### Debug Mode + +Enable debug logging by setting `LOG_LEVEL=DEBUG` in your `.env` file. + +### Logs + +Check logs in the `logs/` directory: +- `app.log` - Application logs +- `websocket.log` - WebSocket connection logs +- `memory.log` - Memory operations logs + +## Contributing + +1. Fork the repository +2. Create a feature branch +3. Make your changes +4. Run tests and linters +5. Submit a pull request + +## License + +This project is licensed under the MIT License. + +## Support + +For issues and questions: +1. Check the troubleshooting section +2. Review the logs +3. Create an issue with detailed information diff --git a/assist/infra/setup.ps1 b/assist/infra/setup.ps1 new file mode 100644 index 00000000..653fb06a --- /dev/null +++ b/assist/infra/setup.ps1 @@ -0,0 +1,109 @@ +# Setup script for Messenger AI Assistant (Windows PowerShell) + +Write-Host "๐Ÿš€ Setting up Messenger AI Assistant..." -ForegroundColor Green + +# Check if running on Windows +if ($PSVersionTable.PSVersion.Major -lt 5) { + Write-Host "โŒ PowerShell 5+ is required" -ForegroundColor Red + exit 1 +} + +Write-Host "โœ… PowerShell version: $($PSVersionTable.PSVersion)" -ForegroundColor Green + +# Check Node.js version +try { + $nodeVersion = node --version + $nodeMajorVersion = [int]($nodeVersion -replace 'v', '' -split '\.')[0] + if ($nodeMajorVersion -lt 20) { + Write-Host "โŒ Node.js version 20+ is required. Current version: $nodeVersion" -ForegroundColor Red + exit 1 + } + Write-Host "โœ… Node.js version: $nodeVersion" -ForegroundColor Green +} catch { + Write-Host "โŒ Node.js is not installed. Please install Node.js 20+ from https://nodejs.org/" -ForegroundColor Red + exit 1 +} + +# Check Python version +try { + $pythonVersion = python --version + $pythonMajorMinor = ($pythonVersion -split ' ')[1] -split '\.' | Select-Object -First 2 + $pythonVersionNumber = [double]"$($pythonMajorMinor[0]).$($pythonMajorMinor[1])" + if ($pythonVersionNumber -lt 3.11) { + Write-Host "โŒ Python 3.11+ is required. Current version: $pythonVersion" -ForegroundColor Red + exit 1 + } + Write-Host "โœ… Python version: $pythonVersion" -ForegroundColor Green +} catch { + Write-Host "โŒ Python 3 is not installed. Please install Python 3.11+ from https://python.org/" -ForegroundColor Red + exit 1 +} + +# Check gcloud CLI +try { + gcloud --version | Out-Null + Write-Host "โœ… Google Cloud CLI installed" -ForegroundColor Green +} catch { + Write-Host "โŒ Google Cloud CLI is not installed. Please install from https://cloud.google.com/sdk/docs/install" -ForegroundColor Red + exit 1 +} + +# Check gcloud authentication +try { + $activeAccounts = gcloud auth list --filter=status:ACTIVE --format="value(account)" + if (-not $activeAccounts) { + Write-Host "โŒ Not authenticated with Google Cloud. Please run: gcloud auth login" -ForegroundColor Red + exit 1 + } + Write-Host "โœ… Google Cloud authenticated" -ForegroundColor Green +} catch { + Write-Host "โŒ Google Cloud authentication failed" -ForegroundColor Red + exit 1 +} + +# Set up environment variables +if (-not (Test-Path ".env")) { + Write-Host "๐Ÿ“ Creating .env file..." -ForegroundColor Yellow + Copy-Item ".env.example" ".env" + Write-Host "โš ๏ธ Please edit .env file with your Google Cloud project details" -ForegroundColor Yellow +} + +# Enable required Google Cloud APIs +Write-Host "๐Ÿ”ง Enabling Google Cloud APIs..." -ForegroundColor Yellow +gcloud services enable aiplatform.googleapis.com +gcloud services enable firestore.googleapis.com +gcloud services enable generativelanguage.googleapis.com + +Write-Host "โœ… Google Cloud APIs enabled" -ForegroundColor Green + +# Install Python dependencies +Write-Host "๐Ÿ“ฆ Installing Python dependencies..." -ForegroundColor Yellow +Set-Location "assist/server" +pip install -r requirements.txt + +Write-Host "โœ… Python dependencies installed" -ForegroundColor Green + +# Build Chrome extension +Write-Host "๐Ÿ”จ Building Chrome extension..." -ForegroundColor Yellow +Set-Location "../chrome-ext" +npm install +npm run build + +Write-Host "โœ… Chrome extension built" -ForegroundColor Green + +# Create directories +Write-Host "๐Ÿ“ Creating necessary directories..." -ForegroundColor Yellow +New-Item -ItemType Directory -Path "logs" -Force | Out-Null +New-Item -ItemType Directory -Path "data" -Force | Out-Null + +Write-Host "โœ… Directories created" -ForegroundColor Green + +Write-Host "" +Write-Host "๐ŸŽ‰ Setup complete!" -ForegroundColor Green +Write-Host "" +Write-Host "Next steps:" -ForegroundColor Yellow +Write-Host "1. Edit .env file with your Google Cloud project details" -ForegroundColor White +Write-Host "2. Run 'make dev' to start the development server" -ForegroundColor White +Write-Host "3. Load the Chrome extension from chrome-ext/dist directory" -ForegroundColor White +Write-Host "" +Write-Host "For more information, see docs/README.md" -ForegroundColor Cyan diff --git a/assist/infra/setup.sh b/assist/infra/setup.sh new file mode 100644 index 00000000..11123984 --- /dev/null +++ b/assist/infra/setup.sh @@ -0,0 +1,103 @@ +#!/bin/bash +# Setup script for Messenger AI Assistant (Linux/macOS) + +set -e + +echo "๐Ÿš€ Setting up Messenger AI Assistant..." + +# Check if running on supported OS +if [[ "$OSTYPE" != "linux-gnu"* && "$OSTYPE" != "darwin"* ]]; then + echo "โŒ This script is for Linux/macOS only. Use setup.ps1 for Windows." + exit 1 +fi + +# Check Node.js version +if ! command -v node &> /dev/null; then + echo "โŒ Node.js is not installed. Please install Node.js 20+ from https://nodejs.org/" + exit 1 +fi + +NODE_VERSION=$(node --version | cut -d'v' -f2 | cut -d'.' -f1) +if [ "$NODE_VERSION" -lt 20 ]; then + echo "โŒ Node.js version 20+ is required. Current version: $(node --version)" + exit 1 +fi + +echo "โœ… Node.js version: $(node --version)" + +# Check Python version +if ! command -v python3 &> /dev/null; then + echo "โŒ Python 3 is not installed. Please install Python 3.11+ from https://python.org/" + exit 1 +fi + +PYTHON_VERSION=$(python3 --version | cut -d' ' -f2 | cut -d'.' -f1,2) +if [[ "$PYTHON_VERSION" < "3.11" ]]; then + echo "โŒ Python 3.11+ is required. Current version: $(python3 --version)" + exit 1 +fi + +echo "โœ… Python version: $(python3 --version)" + +# Check gcloud CLI +if ! command -v gcloud &> /dev/null; then + echo "โŒ Google Cloud CLI is not installed. Please install from https://cloud.google.com/sdk/docs/install" + exit 1 +fi + +echo "โœ… Google Cloud CLI installed" + +# Check gcloud authentication +if ! gcloud auth list --filter=status:ACTIVE --format="value(account)" | grep -q .; then + echo "โŒ Not authenticated with Google Cloud. Please run: gcloud auth login" + exit 1 +fi + +echo "โœ… Google Cloud authenticated" + +# Set up environment variables +if [ ! -f ".env" ]; then + echo "๐Ÿ“ Creating .env file..." + cp .env.example .env + echo "โš ๏ธ Please edit .env file with your Google Cloud project details" +fi + +# Enable required Google Cloud APIs +echo "๐Ÿ”ง Enabling Google Cloud APIs..." +gcloud services enable aiplatform.googleapis.com +gcloud services enable firestore.googleapis.com +gcloud services enable generativelanguage.googleapis.com + +echo "โœ… Google Cloud APIs enabled" + +# Install Python dependencies +echo "๐Ÿ“ฆ Installing Python dependencies..." +cd assist/server +pip install -r requirements.txt + +echo "โœ… Python dependencies installed" + +# Build Chrome extension +echo "๐Ÿ”จ Building Chrome extension..." +cd ../chrome-ext +npm install +npm run build + +echo "โœ… Chrome extension built" + +# Create directories +echo "๐Ÿ“ Creating necessary directories..." +mkdir -p logs +mkdir -p data + +echo "โœ… Directories created" + +echo "" +echo "๐ŸŽ‰ Setup complete!" +echo "" +echo "Next steps:" +echo "1. Edit .env file with your Google Cloud project details" +echo "2. Run 'make dev' to start the development server" +echo "3. Load the Chrome extension from chrome-ext/dist directory" +echo "" +echo "For more information, see docs/README.md" diff --git a/assist/server/Dockerfile b/assist/server/Dockerfile new file mode 100644 index 00000000..e8cfbe51 --- /dev/null +++ b/assist/server/Dockerfile @@ -0,0 +1,32 @@ +# Dockerfile for Messenger AI Assistant Backend + +FROM python:3.11-slim + +# Set working directory +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + g++ \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Create logs directory +RUN mkdir -p logs + +# Expose ports +EXPOSE 8000 8765 + +# Health check +HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +# Run the application +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/assist/server/docker-compose.yml b/assist/server/docker-compose.yml new file mode 100644 index 00000000..621fdfd1 --- /dev/null +++ b/assist/server/docker-compose.yml @@ -0,0 +1,35 @@ +version: '3.8' + +services: + messenger-ai-backend: + build: . + ports: + - "8000:8000" + - "8765:8765" + environment: + - GOOGLE_PROJECT_ID=${GOOGLE_PROJECT_ID} + - GOOGLE_APPLICATION_CREDENTIALS=/app/credentials.json + - GEMINI_API_KEY=${GEMINI_API_KEY} + - VERTEX_LOCATION=${VERTEX_LOCATION} + volumes: + - ./credentials.json:/app/credentials.json:ro + - ./logs:/app/logs + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + + # Optional: Add a reverse proxy for production + nginx: + image: nginx:alpine + ports: + - "80:80" + - "443:443" + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf:ro + depends_on: + - messenger-ai-backend + restart: unless-stopped