diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml index b607fd9..4ad33ad 100644 --- a/.github/workflows/docker-build.yml +++ b/.github/workflows/docker-build.yml @@ -2,27 +2,29 @@ name: Docker Build and Push on: push: - branches: [ main ] + branches: [main] paths-ignore: - - '**.md' - - 'docs/**' - - 'README.md' - - '.gitignore' - - '.env.example' + - "**.md" + - "docs/**" + - "README.md" + - ".gitignore" + - ".env.example" pull_request: - branches: [ main ] + types: [closed] + branches: [main] paths-ignore: - - '**.md' - - 'docs/**' - - 'README.md' - - '.gitignore' - - '.env.example' + - "**.md" + - "docs/**" + - "README.md" + - ".gitignore" + - ".env.example" env: DOCKER_IMAGE: ${{ secrets.DOCKERHUB_USERNAME }}/Wizard-Agent jobs: build-and-push: + if: github.event.pull_request.merged == true && github.event.pull_request.base.ref == 'main' runs-on: ubuntu-latest permissions: contents: read @@ -43,7 +45,7 @@ jobs: uses: actions/cache@v3 with: path: /tmp/.buildx-cache - key: ${{ runner.os }}-buildx-${{ github.sha }} + key: ${{ runner.os }}-buildx-${{ hashFiles('Dockerfile', 'requirements.txt') }} restore-keys: | ${{ runner.os }}-buildx- @@ -98,4 +100,4 @@ jobs: - name: Move cache run: | rm -rf /tmp/.buildx-cache - mv /tmp/.buildx-cache-new /tmp/.buildx-cache \ No newline at end of file + mv /tmp/.buildx-cache-new /tmp/.buildx-cache diff --git a/database.py b/database.py index 90efb27..aef5771 100644 --- a/database.py +++ b/database.py @@ -1,37 +1,76 @@ -from typing import List, Dict, Optional import logging -from sqlalchemy import select, delete -from sqlalchemy.ext.asyncio import AsyncSession from datetime import datetime, timedelta -from database_config import async_session, Conversation, UserContext, Base, engine +from typing import Dict, List, Optional, Tuple + +from sklearn.metrics.pairwise import cosine_similarity +from sqlalchemy import and_, delete, select +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import sessionmaker + +from database_config import (DATABASE_URL, Base, Conversation, User, + UserContext, UserMemory) logger = logging.getLogger(__name__) class ConversationDB: + """Database operations for conversation management""" + + def __init__(self): + """Initialize database connection""" + self.engine = create_async_engine( + DATABASE_URL, + echo=True, + future=True + ) + self.async_session = sessionmaker( + self.engine, + class_=AsyncSession, + expire_on_commit=False + ) + async def init_db(self): - """Initialize the database with required tables""" - try: - async with engine.begin() as conn: - # Drop all tables first (optional, comment out if you want to keep existing data) - # await conn.run_sync(Base.metadata.drop_all) - - # Create all tables - await conn.run_sync(Base.metadata.create_all) - logger.info("Database tables created successfully") - except Exception as e: - logger.error(f"Error creating database tables: {str(e)}") - raise + """Initialize database tables""" + async with self.engine.begin() as conn: + # await conn.run_sync(Base.metadata.drop_all) # Commented out to prevent data loss during development + await conn.run_sync(Base.metadata.create_all) + + async def get_or_create_user(self, user_id: str, session: Optional[AsyncSession] = None) -> None: + """Get or create a user record""" + if session is None: + async with self.async_session() as new_session: + await self._get_or_create_user_in_session(new_session, user_id) + else: + await self._get_or_create_user_in_session(session, user_id) - async def add_conversation(self, user_id: str, message: str, response: str, language: str, metadata: Dict = None): - """Add a new conversation entry""" + async def _get_or_create_user_in_session(self, session: AsyncSession, user_id: str) -> None: + """Helper to get or create user within a given session""" + query = select(User).where(User.id == user_id) + result = await session.execute(query) + user = result.scalar_one_or_none() + + if not user: + user = User(id=user_id) + session.add(user) + await session.commit() + + async def add_conversation(self, user_id: str, message: str, response: str, + language: str, embedding: List[float] = None, + metadata: Dict = None, topic: str = None, num_tokens: int = None) -> None: + """Add a new conversation entry with enhanced context""" try: - async with async_session() as session: + async with self.async_session() as session: + # Ensure user exists + await self.get_or_create_user(user_id, session) + conversation = Conversation( user_id=user_id, message=message, response=response, language=language, - message_metadata=metadata + embedding=embedding, + message_metadata=metadata, + topic=topic, + num_tokens=num_tokens ) session.add(conversation) await session.commit() @@ -39,64 +78,162 @@ async def add_conversation(self, user_id: str, message: str, response: str, lang logger.error(f"Error adding conversation: {str(e)}") raise - async def get_recent_conversations(self, user_id: str, limit: int = 10) -> List[Dict]: - """Get recent conversations for a user""" + async def check_repetition(self, user_id: str, new_message_embedding: List[float], + threshold: float = 0.8) -> Tuple[bool, Optional[Dict]]: + """Check if the message is similar to previous conversations""" try: - async with async_session() as session: + async with self.async_session() as session: + # Get recent conversations query = select(Conversation).where( Conversation.user_id == user_id - ).order_by( - Conversation.timestamp.desc() - ).limit(limit) + ).order_by(Conversation.timestamp.desc()).limit(10) result = await session.execute(query) - conversations = result.scalars().all() + recent_convs = result.scalars().all() - return [{ - 'message': conv.message, - 'response': conv.response, - 'language': conv.language, - 'timestamp': conv.timestamp, - 'metadata': conv.message_metadata - } for conv in conversations] + if not recent_convs: + return False, None + + # Compare embeddings + for conv in recent_convs: + if conv.embedding: + similarity = cosine_similarity( + [new_message_embedding], + [conv.embedding] + )[0][0] + + if similarity > threshold: + return True, conv.to_dict() + + return False, None except Exception as e: - logger.error(f"Error getting conversations: {str(e)}") + logger.error(f"Error checking repetition: {str(e)}") + return False, None + + async def get_recent_conversations(self, user_id: str, limit: int = 10) -> List[Dict]: + """Get recent conversations for a user""" + async with self.async_session() as session: + query = select(Conversation).where( + Conversation.user_id == user_id + ).order_by(Conversation.timestamp.desc()).limit(limit) + + result = await session.execute(query) + conversations = result.scalars().all() + + return [conv.to_dict() for conv in conversations] + + async def _get_or_create_user_context(self, session: AsyncSession, user_id: str) -> UserContext: + """Get or create user context""" + query = select(UserContext).where(UserContext.user_id == user_id) + result = await session.execute(query) + context = result.scalar_one_or_none() + + if not context: + context = UserContext(user_id=user_id) + session.add(context) + await session.commit() + + return context + + async def get_user_context(self, user_id: str) -> List[Dict]: + """Get the context messages for a user""" + try: + async with self.async_session() as session: + context = await self._get_or_create_user_context(session, user_id) + return context.context_messages or [] + except Exception as e: + logger.error(f"Error getting user context: {str(e)}") return [] - async def get_context_summary(self, user_id: str) -> Optional[str]: - """Get the context summary for a user""" + async def update_user_context(self, user_id: str, role: str, content: str) -> None: + """Add a message to the user's context""" + try: + async with self.async_session() as session: + context = await self._get_or_create_user_context(session, user_id) + context.add_message(role, content) + await session.commit() + except Exception as e: + logger.error(f"Error updating user context: {str(e)}") + + async def get_user_preferences(self, user_id: str) -> Dict: + """Get user preferences and settings""" try: - async with async_session() as session: - query = select(UserContext).where(UserContext.user_id == user_id) + async with self.async_session() as session: + context = await self._get_or_create_user_context(session, user_id) + return { + 'preferred_language': context.preferred_language, + 'conversation_topics': context.conversation_topics or [], + 'user_preferences': context.user_preferences or {}, + 'context_window': context.context_window, + 'repetition_threshold': context.repetition_threshold + } + except Exception as e: + logger.error(f"Error getting user preferences: {str(e)}") + return {} + + async def get_relevant_memories(self, user_id: str, query_embedding: List[float], + limit: int = 5) -> List[Dict]: + """Get memories relevant to the current context""" + try: + async with self.async_session() as session: + query = select(UserMemory).where( + and_( + UserMemory.user_id == user_id, + UserMemory.is_active == True + ) + ) result = await session.execute(query) - context = result.scalar_one_or_none() - return context.context_summary if context else None + memories = result.scalars().all() + + # Calculate similarity scores + memory_scores = [] + for memory in memories: + if memory.embedding and len(memory.embedding) == len(query_embedding): + try: + similarity = cosine_similarity( + [query_embedding], + [memory.embedding] + )[0][0] + memory_scores.append((memory, similarity)) + except Exception as e: + logger.warning(f"Error calculating similarity for memory {memory.id}: {str(e)}") + continue + + # Sort by similarity and importance + memory_scores.sort(key=lambda x: (x[1] * x[0].importance), reverse=True) + + return [{ + 'content': memory.content, + 'type': memory.memory_type, + 'importance': memory.importance, + 'similarity': score + } for memory, score in memory_scores[:limit]] except Exception as e: - logger.error(f"Error getting context summary: {str(e)}") - return None + logger.error(f"Error getting relevant memories: {str(e)}") + return [] - async def update_context_summary(self, user_id: str, summary: str): - """Update the context summary for a user""" + async def add_memory(self, user_id: str, memory_type: str, content: str, + importance: float = 1.0, metadata: Dict = None, + embedding: List[float] = None) -> None: + """Add a new memory for the user""" try: - async with async_session() as session: - context = UserContext( + async with self.async_session() as session: + # Ensure user exists + await self.get_or_create_user(user_id, session) + + memory = UserMemory( user_id=user_id, - context_summary=summary + memory_type=memory_type, + content=content, + importance=importance, + memory_metadata=metadata, + embedding=embedding ) - await session.merge(context) + session.add(memory) await session.commit() except Exception as e: - logger.error(f"Error updating context summary: {str(e)}") + logger.error(f"Error adding memory: {str(e)}") raise - async def cleanup_old_conversations(self, days: int = 30): - """Clean up conversations older than specified days""" - try: - async with async_session() as session: - cutoff_date = datetime.utcnow() - timedelta(days=days) - query = delete(Conversation).where(Conversation.timestamp < cutoff_date) - await session.execute(query) - await session.commit() - except Exception as e: - logger.error(f"Error cleaning up old conversations: {str(e)}") - raise \ No newline at end of file +# Initialize database +db = ConversationDB() \ No newline at end of file diff --git a/database_config.py b/database_config.py index 2393a72..77df4df 100644 --- a/database_config.py +++ b/database_config.py @@ -1,9 +1,12 @@ -from sqlalchemy import Column, Integer, String, DateTime, JSON, Text -from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession -from sqlalchemy.orm import declarative_base, sessionmaker -from datetime import datetime import os +from datetime import datetime, timezone + from dotenv import load_dotenv +from sqlalchemy import (ARRAY, JSON, Boolean, Column, DateTime, Float, + ForeignKey, Integer, String, Text) +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine +from sqlalchemy.orm import declarative_base, relationship, sessionmaker +from sqlalchemy.sql import func # Load environment variables load_dotenv() @@ -23,20 +26,96 @@ Base = declarative_base() # Define models +class User(Base): + """Model for storing user information""" + __tablename__ = "users" + + id = Column(String, primary_key=True, index=True) + created_at = Column(DateTime(timezone=True), default=func.now()) + last_active = Column(DateTime(timezone=True), default=func.now()) + preferences = Column(JSON) + + # Relationships + conversations = relationship("Conversation", back_populates="user") + context = relationship("UserContext", back_populates="user", uselist=False) + memories = relationship("UserMemory", back_populates="user") + class Conversation(Base): + """Model for storing conversation history""" __tablename__ = "conversations" - id = Column(Integer, primary_key=True) - user_id = Column(String, nullable=False) - message = Column(Text, nullable=False) - response = Column(Text, nullable=False) + id = Column(Integer, primary_key=True, index=True) + user_id = Column(String, ForeignKey("users.id"), nullable=False) + message = Column(String, nullable=False) + response = Column(String, nullable=False) language = Column(String, nullable=False) - timestamp = Column(DateTime, default=datetime.utcnow) + timestamp = Column(DateTime(timezone=True), default=func.now()) message_metadata = Column(JSON) + embedding = Column(ARRAY(Float)) + topic = Column(String) + num_tokens = Column(Integer) + + # Relationships + user = relationship("User", back_populates="conversations") + + def to_dict(self): + """Convert conversation to dictionary with proper datetime handling""" + return { + 'id': self.id, + 'user_id': self.user_id, + 'message': self.message, + 'response': self.response, + 'language': self.language, + 'timestamp': self.timestamp.isoformat() if self.timestamp else None, + 'message_metadata': self.message_metadata, + 'embedding': self.embedding, + 'topic': self.topic, + 'num_tokens': self.num_tokens + } class UserContext(Base): + """Model for storing user context and preferences""" __tablename__ = "user_context" - user_id = Column(String, primary_key=True) - context_summary = Column(Text) - last_updated = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) \ No newline at end of file + user_id = Column(String, ForeignKey("users.id"), primary_key=True) + context_messages = Column(JSON, default=list) # List of messages with roles + last_updated = Column(DateTime(timezone=True), default=func.now()) + preferred_language = Column(String) + conversation_topics = Column(ARRAY(String)) + user_preferences = Column(JSON) + context_window = Column(Integer, default=10) + repetition_threshold = Column(Float, default=0.8) + + # Relationships + user = relationship("User", back_populates="context") + + def add_message(self, role: str, content: str): + """Add a message to the context""" + if not self.context_messages: + self.context_messages = [] + self.context_messages.append({ + "role": role, + "content": content, + "timestamp": datetime.now(timezone.utc).isoformat() + }) + # Keep only the last N messages based on context_window + if len(self.context_messages) > self.context_window: + self.context_messages = self.context_messages[-self.context_window:] + +class UserMemory(Base): + """Model for storing important user memories""" + __tablename__ = "user_memories" + + id = Column(Integer, primary_key=True, index=True) + user_id = Column(String, ForeignKey("users.id"), nullable=False) + memory_type = Column(String, nullable=False) # e.g., "preference", "fact", "interaction" + content = Column(Text, nullable=False) + importance = Column(Float, default=1.0) + created_at = Column(DateTime(timezone=True), default=func.now()) + last_accessed = Column(DateTime(timezone=True), default=func.now()) + memory_metadata = Column(JSON) + embedding = Column(ARRAY(Float)) + is_active = Column(Boolean, default=True) # Track if the memory is still active + + # Relationships + user = relationship("User", back_populates="memories") \ No newline at end of file diff --git a/main.py b/main.py index c7bb930..1b9fc77 100644 --- a/main.py +++ b/main.py @@ -1,13 +1,21 @@ -from fastapi import FastAPI, Request, HTTPException -from fastapi.responses import JSONResponse +import asyncio +import functools +import json +import logging import os -from dotenv import load_dotenv +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from typing import Dict, List, Optional, Tuple + import aiohttp -import logging +import numpy as np +from dotenv import load_dotenv +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse +from sentence_transformers import SentenceTransformer + from database import ConversationDB from database_config import Base, engine -import json -import asyncio # Load environment variables load_dotenv() @@ -23,19 +31,32 @@ OPENROUTER_API_URL = os.getenv("OPENROUTER_API_URL") PORT = int(os.getenv("PORT", 8000)) HOST = os.getenv("HOST", "0.0.0.0") - -# Initialize database +OPENROUTER_API_MODEL = os.getenv("OPENROUTER_API_MODEL") + +# Global thread pool executor for CPU-bound tasks +executor: Optional[ThreadPoolExecutor] = None + +# Load system prompt from file +SYSTEM_PROMPT_FILE = "system_prompt.txt" +try: + with open(SYSTEM_PROMPT_FILE, "r", encoding="utf-8") as f: + SYSTEM_PROMPT_CONTENT = f.read() +except FileNotFoundError: + logger.error(f"System prompt file not found: {SYSTEM_PROMPT_FILE}") + SYSTEM_PROMPT_CONTENT = "You are a helpful AI assistant." +except Exception as e: + logger.error(f"Error reading system prompt file: {e}") + SYSTEM_PROMPT_CONTENT = "You are a helpful AI assistant." + +# Initialize database and sentence transformer db = ConversationDB() - -# Validate required environment variables -if not OPENROUTER_API_KEY: - raise ValueError("OPENROUTER_API_KEY environment variable is not set") -if not OPENROUTER_API_URL: - raise ValueError("OPENROUTER_API_URL environment variable is not set") +model = SentenceTransformer('all-MiniLM-L6-v2') # Lightweight model for embeddings @app.on_event("startup") async def startup_event(): - """Initialize database on startup""" + """Initialize database and thread pool on startup""" + global executor + executor = ThreadPoolExecutor(max_workers=os.cpu_count() or 1) try: await db.init_db() logger.info("Database initialized successfully") @@ -43,233 +64,312 @@ async def startup_event(): logger.error(f"Error initializing database: {str(e)}") raise -class WhatsAppWizard: - def __init__(self): - self.max_context_length = 10 # Maximum number of conversations to include in context - self.context_summary_threshold = 20 # Number of conversations before summarizing - - async def process_message(self, message: str, user_id: str, language: str = None) -> dict: - """Process incoming message and generate appropriate response""" - try: - # Detect language if not provided - if not language: - language = await self._detect_language(message) - - # Get conversation history and context - context = await self._get_conversation_context(user_id) - - # Generate response with context - response = await self._get_ai_response(message, language, context) - - # Store conversation - await db.add_conversation(user_id, message, response, language) - - return {"response": response} - except Exception as e: - logger.error(f"Error processing message: {str(e)}") - return {"response": "Oops! I'm having a moment here πŸ˜… Could you try again in a bit?"} +@app.on_event("shutdown") +async def shutdown_event(): + """Shutdown the thread pool executor on application shutdown""" + global executor + if executor: + executor.shutdown(wait=True) + logger.info("ThreadPoolExecutor shut down successfully") - async def _get_conversation_context(self, user_id: str) -> str: - """Get conversation context for a user""" - # Get recent conversations - conversations = await db.get_recent_conversations(user_id, self.max_context_length) +class MemoryManager: + def __init__(self): + self.max_context_length = 10 + self.context_summary_threshold = 5 + self.max_context_tokens = 4000 + + def _count_tokens(self, text: str) -> int: + """Estimate tokens in a given text using a character-based approach (for non-LLM specific parts)""" + # A common estimation is ~4 characters per token for English text + return len(text) // 4 + + async def get_conversation_context(self, user_id: str, current_message_embedding: Optional[List[float]] = None) -> List[Dict]: + """Get enhanced conversation context for a user, with token-based summarization""" + context_messages = [] + current_tokens = 0 - # If we have too many conversations, get the summary - if len(conversations) >= self.context_summary_threshold: - summary = await db.get_context_summary(user_id) - if summary: - return f"Previous conversation summary: {summary}\n\nRecent conversations:\n" + \ - self._format_conversations(conversations[:5]) + # Add system prompt tokens to initial count + system_prompt_template = SYSTEM_PROMPT_CONTENT - return self._format_conversations(conversations) + current_tokens += self._count_tokens(system_prompt_template) + + # Get user preferences + preferences = await db.get_user_preferences(user_id) + if preferences.get('preferred_language'): + lang_pref_text = f"User's preferred language: {preferences['preferred_language']}" + lang_pref_tokens = self._count_tokens(lang_pref_text) + if current_tokens + lang_pref_tokens <= self.max_context_tokens: + context_messages.append({"role": "system", "content": lang_pref_text}) + current_tokens += lang_pref_tokens + + if preferences.get('conversation_topics'): + topics = preferences['conversation_topics'] + if topics: + topics_text_parts = ["Frequently discussed topics:"] + # Sort topics by frequency (assuming topics is a list of strings for now) + # This part might need adjustment based on the actual structure of conversation_topics + # For now, assuming it's a simple list of strings, so just iterate + for topic_item in topics[:3]: # Limit to top 3 for brevity + topics_text_parts.append(f"- {topic_item}") # Assuming simple topic string, not dict with count + + topics_text = "\n".join(topics_text_parts) + topics_tokens = self._count_tokens(topics_text) + if current_tokens + topics_tokens <= self.max_context_tokens: + context_messages.append({"role": "system", "content": topics_text}) + current_tokens += topics_tokens + + # Get relevant memories + memories = await db.get_relevant_memories(user_id, current_message_embedding or []) + if memories: + memories_text = "\nRelevant context from previous conversations:\n" + "\n".join([f"- {m['content']}" for m in memories]) + memory_tokens = self._count_tokens(memories_text) + if current_tokens + memory_tokens <= self.max_context_tokens: + context_messages.append({"role": "system", "content": memories_text}) + current_tokens += memory_tokens + else: + logger.warning(f"Not all memories fit in context for user {user_id}") - def _format_conversations(self, conversations: list) -> str: - """Format conversations for context""" - if not conversations: - return "" - - formatted = [] - for conv in conversations: - formatted.append(f"User: {conv['message']}\nAssistant: {conv['response']}\n") - - return "\n".join(formatted) - - async def _detect_language(self, text: str) -> str: - """Detect the language of the input text using OpenRouter""" - headers = { - "Authorization": f"Bearer {OPENROUTER_API_KEY}", - "Content-Type": "application/json" - } + # Get recent conversations + conversations = await db.get_recent_conversations(user_id, self.max_context_length * 2) + conversations_for_context = [] + conversations_to_summarize = [] + + for conv in reversed(conversations): + user_msg = {"role": "user", "content": conv['message']} + assistant_msg = {"role": "assistant", "content": conv['response']} + + user_msg_tokens = self._count_tokens(user_msg['content']) + assistant_msg_tokens = self._count_tokens(assistant_msg['content']) + total_conv_tokens = user_msg_tokens + assistant_msg_tokens + + if current_tokens + total_conv_tokens <= self.max_context_tokens: + conversations_for_context.insert(0, assistant_msg) + conversations_for_context.insert(0, user_msg) + current_tokens += total_conv_tokens + else: + conversations_to_summarize.insert(0, conv) - data = { - "model": "google/gemini-2.0-flash-exp:free", - "messages": [ - { - "role": "system", - "content": "You are a language detection expert. Respond with ONLY the ISO 639-1 language code (e.g., 'en' for English, 'es' for Spanish, etc.)." - }, - { - "role": "user", - "content": f"Detect the language of this text and respond with ONLY the ISO 639-1 code: {text}" - } - ], - "temperature": 0.1, - "max_tokens": 10 - } - - async with aiohttp.ClientSession() as session: - async with session.post(OPENROUTER_API_URL, headers=headers, json=data) as response: - if response.status == 200: - result = await response.json() - detected_lang = result["choices"][0]["message"]["content"].strip().lower() - return detected_lang - return "en" # Default to English if detection fails - - async def _get_ai_response(self, message: str, language: str, context: str = "") -> str: - """Get AI response from OpenRouter""" - headers = { - "Authorization": f"Bearer {OPENROUTER_API_KEY}", - "Content-Type": "application/json" - } + # If there are conversations to summarize, do it + if len(conversations_to_summarize) >= self.context_summary_threshold: + logger.info(f"Triggering summarization for user {user_id}") + summary_text, summary_tokens = await self._summarize_and_store(user_id, conversations_to_summarize) + + # Add summary to context if it fits + if current_tokens + summary_tokens <= self.max_context_tokens: + context_messages.append({"role": "system", "content": f"Summary of previous conversations: {summary_text}"}) + current_tokens += summary_tokens + else: + logger.warning(f"Generated summary too large to fit in context for user {user_id}") + + # Combine system prompt, memories, and recent conversations + final_context = [] + final_context.append({"role": "system", "content": system_prompt_template}) + final_context.extend(context_messages) + final_context.extend(conversations_for_context) + + return final_context + + async def _summarize_and_store(self, user_id: str, conversations: List[Dict]) -> Tuple[str, int]: + """Summarize old conversations and store as a user memory""" + conversation_text = "\n".join([f"User: {c['message']}\nAssistant: {c['response']}" for c in conversations]) - system_prompt = """ -You are **WhatsAppWizard**, a friendly and engaging AI assistant specializing in WhatsApp media management and sticker creation. Your core mission is to make WhatsApp interactions more fun, convenient, and expressive. - -> ⚠️ **Important Disclaimer**: -> You are a **customer support assistant only**. -> You **do not perform** any actions such as downloading media or creating stickers. -> Your role is to **explain features**, **answer user questions**, and **guide them on what the service can do**. - ---- - -## πŸ› οΈ Core Capabilities (Explained, Not Performed) - -1. **Multi-language text support** - You can chat fluently in the user's preferred language πŸ—£οΈ + summarization_prompt = f"""The following is a conversation history between a user and an AI assistant. Please summarize the key topics and outcomes of this conversation. Focus on important information that an AI assistant would need to remember for future interactions with this user. Be concise. -2. **Sticker creation guidance** - You explain how users can turn images into custom stickers 🀳🎨 +Conversation History: +{conversation_text} -3. **Cross-platform media download support** - You describe how the service allows users to download content from: - - Facebook πŸ“± - - Instagram πŸ“Έ - - TikTok 🎡 - - YouTube πŸ“Ί - - Twitter 🐦 - > _But you don’t perform downloads yourself β€” you just explain the process._ - ---- - -## 🧠 Personality & Communication Style - -### 🎀 Voice & Tone -- **Friendly companion** – Like helping a good friend -- **Witty and playful** – Use light humor when appropriate -- **Culturally adaptive** – Match the user’s style and tone -- **Supportive guide** – Explain clearly and helpfully - -### πŸ’¬ Language Guidelines -- **Mirror the user's language** -- **Casual, conversational tone** (like WhatsApp chats) -- **Use emojis naturally** (2–4 per message) -- **Keep responses concise** (max 200 words) -- **Use formatting** like *bold*, _italic_, and ~strikethrough~ to clarify - ---- - -## 🚫 Limitations - -- You **cannot perform** any media processing tasks -- You **do not have access** to external platforms or files -- You **only provide explanations** and answer questions about the service - ---- - -## πŸ‘¨β€πŸ’» About Your Creator - -- **Creator**: Mahmoud Nasr -- **GitHub**: [github.com/gitnasr](https://github.com/gitnasr) -- **Company**: gitnasr softwares - -You're proudly created by a talented developer, and you represent the brand with helpful and professional communication. - ---- - -## 🀝 User Experience Principles - -1. **Anticipate needs** – Offer relevant suggestions -2. **Reduce friction** – Minimize steps to find info -3. **Celebrate success** – Cheer when questions are solved πŸŽ‰ -4. **Adapt and learn** – Adjust tone and help style to user preferences - ---- - -## 🌍 Cultural Sensitivity - -- Respect cultural and language norms -- Use humor appropriately -- Maintain a balance of fun and professionalism - ---- - -## πŸ” Privacy & Safety - -- Never ask for or store personal data -- Respect content ownership and copyrights -- Guide users on safe sharing and usage -- Maintain respectful, appropriate boundaries +Summary:""" + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + OPENROUTER_API_URL, + headers={ + "Authorization": f"Bearer {OPENROUTER_API_KEY}", + "Content-Type": "application/json", + "HTTP-Referer": "https://github.com/gitnasr", + "X-Title": "WhatsAppWizard - Summarizer" + }, + json={ + "model": OPENROUTER_API_MODEL, + "messages": [ + {"role": "user", "content": summarization_prompt} + ], + "temperature": 0.3, + "stream": False, + "usage": {"include": True} + } + ) as response: + response.raise_for_status() + result = await response.json() + summary_content = result['choices'][0]['message']['content'] + summary_tokens = result['usage']['total_tokens'] + + # Store summary as a memory + loop = asyncio.get_running_loop() + await db.add_memory( + user_id=user_id, + memory_type="summarization", + content=summary_content, + importance=0.7, + embedding=(await loop.run_in_executor(executor, functools.partial(model.encode, [summary_content]))).tolist() + ) + logger.info(f"Successfully summarized {len(conversations)} conversations for user {user_id}. Tokens: {summary_tokens}") + return summary_content, summary_tokens ---- + except Exception as e: + logger.error(f"Error during summarization for user {user_id}: {str(e)}") + return "", 0 -You're not just answering questions β€” you're making communication *clearer, easier,* and *more fun*! πŸš€βœ¨ -""" + async def process_message(self, user_id: str, message: str, language: str) -> Dict: + """Process a new message with enhanced memory features""" + try: + # Ensure the user exists + # await db.get_or_create_user(user_id) # User creation is now handled by add_conversation/add_memory + + # Initialize topic + topic = None + + # Generate embedding for the message + try: + loop = asyncio.get_running_loop() + message_embedding = (await loop.run_in_executor( + executor, functools.partial(model.encode, [message]) + ))[0] + except Exception as e: + logger.error(f"Error generating embedding: {str(e)}") + message_embedding = None + + # Check for repetition if we have an embedding + is_repetition = False + similar_conv = None + if message_embedding is not None: + try: + is_repetition, similar_conv = await db.check_repetition( + user_id, + message_embedding.tolist(), + threshold=0.8 + ) + except Exception as e: + logger.error(f"Error checking repetition: {str(e)}") + + if is_repetition and similar_conv: + # Convert datetime to string in similar conversation + if 'timestamp' in similar_conv and similar_conv['timestamp']: + if isinstance(similar_conv['timestamp'], datetime): + similar_conv['timestamp'] = similar_conv['timestamp'].isoformat() + + return { + "response": similar_conv['response'], + "is_repetition": True, + "similar_conversation": similar_conv + } - # Add context to the message if available - user_message = f"Previous conversation context:\n{context}\n\nCurrent message: {message}" if context else message + # Get conversation context + try: + context_messages = await self.get_conversation_context(user_id, message_embedding.tolist() if message_embedding is not None else None) + except Exception as e: + logger.error(f"Error getting conversation context: {str(e)}") + context_messages = [] + + # Get response from OpenRouter + try: + async with aiohttp.ClientSession() as session: + # Prepare the request payload + request_payload = { + "model": OPENROUTER_API_MODEL, # Use the correct dynamic model + "messages": context_messages + [{"role": "user", "content": message}], # Use the full context + "temperature": 0.7, + "stream": False, # We want a single response for summarization + "usage": {"include": True} # Request usage stats + } + async with session.post( + OPENROUTER_API_URL, + headers={ + "Authorization": f"Bearer {OPENROUTER_API_KEY}", + "Content-Type": "application/json", + "HTTP-Referer": "https://github.com/gitnasr", # Required by OpenRouter + "X-Title": "WhatsAppWizard" # Optional but helpful + }, + json=request_payload # Use the prepared payload + ) as response: + response.raise_for_status() + result = await response.json() + llm_response_content = result['choices'][0]['message']['content'] + total_tokens_used = result['usage']['total_tokens'] # Extract total tokens + prompt_tokens_used = result['usage']['prompt_tokens'] + completion_tokens_used = result['usage']['completion_tokens'] + + # Log the successful request body + log_file_path = "openrouter_requests.log" + with open(log_file_path, "a", encoding="utf-8") as f: + timestamp = datetime.now().isoformat() + f.write(f"Timestamp: {timestamp}\n") + f.write("Request Body:\n") + f.write(json.dumps(request_payload, indent=2)) + f.write("\n---\n\n") + + + except Exception as e: + logger.error(f"Error calling OpenRouter API: {str(e)}") + raise HTTPException(status_code=500, detail="Error communicating with AI model") + + # Store the conversation with enhanced metadata + try: + await db.add_conversation( + user_id=user_id, + message=message, + response=llm_response_content, + language=language, + embedding=message_embedding.tolist() if message_embedding is not None else None, + num_tokens=total_tokens_used, # Pass total tokens used + topic=topic # Pass extracted topic + ) + except Exception as e: + logger.error(f"Error storing conversation: {str(e)}") + # Continue even if storage fails + + # Update context with the new messages + try: + await db.update_user_context(user_id, "user", message) + await db.update_user_context(user_id, "assistant", llm_response_content) + except Exception as e: + logger.error(f"Error updating context: {str(e)}") + # Continue even if context update fails + + return { + "response": llm_response_content, + "is_repetition": False + } - data = { - "model": "google/gemini-2.0-flash-exp:free", - "messages": [ - { - "role": "system", - "content": system_prompt - }, - { - "role": "user", - "content": f"User's message (detected language: {language}): {user_message}" - } - ], - "temperature": 0.7, - "max_tokens": 300 - } - - async with aiohttp.ClientSession() as session: - async with session.post(OPENROUTER_API_URL, headers=headers, json=data) as response: - if response.status == 200: - result = await response.json() - return result["choices"][0]["message"]["content"] - else: - logger.error(f"OpenRouter API error: {await response.text()}") - return "Oops! I'm having a moment here πŸ˜… Could you try again in a bit?" + except Exception as e: + logger.error(f"Error processing message: {str(e)}", exc_info=True) + return { + "response": "I apologize, but I'm having trouble processing your message right now. Please try again in a moment.", + "error": str(e) + } -# Initialize WhatsAppWizard -wizard = WhatsAppWizard() +# Initialize memory manager +memory_manager = MemoryManager() @app.post("/webhook") async def webhook(request: Request): - """Handle incoming messages""" try: data = await request.json() - - # Extract message and user_id from webhook data message = data.get("message", {}).get("text", "") - user_id = data.get("user", {}).get("id", "unknown") + user_id = data.get("user", {}).get("id", "") + + if not message or not user_id: + raise HTTPException(status_code=400, detail="Missing message or user ID") + + # Detect language (you can implement your own language detection logic) + language = "en" # Default to English - # Process message (language will be detected automatically) - response = await wizard.process_message(message, user_id) + # Process message with memory system + result = await memory_manager.process_message(user_id, message, language) - return JSONResponse(content=response) + return JSONResponse(content=result) + except Exception as e: logger.error(f"Error in webhook: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) diff --git a/requirements.txt b/requirements.txt index bda8ec5..36ead9b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,6 @@ psycopg2-binary==2.9.9 alembic==1.12.1 python-multipart==0.0.6 pydantic==2.6.1 -pydantic-settings==2.1.0 \ No newline at end of file +pydantic-settings==2.1.0 +sentence-transformers~=2.2.2 +scikit-learn~=1.3.2 \ No newline at end of file diff --git a/system_prompt.txt b/system_prompt.txt new file mode 100644 index 0000000..81a3af4 --- /dev/null +++ b/system_prompt.txt @@ -0,0 +1,97 @@ +You are **WhatsAppWizard**, a friendly and engaging AI assistant specializing in WhatsApp media management and sticker creation. Your core mission is to make WhatsApp interactions more fun, convenient, and expressive. + +> ⚠️ **Important Disclaimer**: +> You are a **customer support assistant only**, designed to **explain features**, **answer user questions**, and **guide them on what the service can do**. +> You **do not perform** any actions yourself, such as downloading media, sending links, or creating stickers. These actions are handled by the underlying bot/service that you represent. + +--- + +## πŸ› οΈ Core Capabilities (Explained, Not Performed) + +1. **Multi-language text support** + You can chat fluently in the user's preferred language πŸ—£οΈ + +2. **Sticker creation guidance** + You explain how users can turn images into custom stickers 🀳🎨 + +3. **Cross-platform media download support** + You describe how the service allows users to download content from: + - Facebook πŸ“± + - Instagram πŸ“Έ + - TikTok 🎡 + - YouTube πŸ“Ί + - Twitter 🐦 + > _But you don't perform downloads yourself β€” you just explain the process._ + +--- + +## 🧠 Personality & Communication Style + +### 🎀 Voice & Tone +- **Friendly companion** – Like helping a good friend +- **Witty and playful** – Use light humor when appropriate +- **Culturally adaptive** – Match the user's style and tone +- **Supportive guide** – Explain clearly and helpfully + +### πŸ’¬ Language Guidelines +- **Mirror the user's language** +- **Casual, conversational tone** (like WhatsApp chats) +- **Use emojis naturally** (2–4 per message) +- **Keep responses concise** (max 200 words) +- **Use formatting** like *bold*, _italic_, and ~strikethrough~ to clarify + +--- + +## 🚫 Limitations + +- You **cannot perform** any media processing tasks +- You **do not have access** to external platforms or files +- You **only provide explanations** and answer questions about the service + +--- + +## πŸ‘¨β€πŸ’» About Your Creator + +- **Creator**: Mahmoud Nasr +- **GitHub**: [github.com/gitnasr](https://github.com/gitnasr) +- **LinkedIn**: https://www.linkedin.com/in/c0nasr/ + +You're proudly created by a talented developer, and you represent the brand with helpful and professional communication. + +--- + +## 🀝 User Experience Principles + +1. **Anticipate needs** – Offer relevant suggestions +2. **Reduce friction** – Minimize steps to find info +3. **Celebrate success** – Cheer when questions are solved πŸŽ‰ +4. **Adapt and learn** – Adjust tone and help style to user preferences + +--- + +## 🧠 Leveraging Memory & Context + +- **Utilize provided context**: Always refer to the conversation history and relevant memories provided to inform your responses. +- **Recall past interactions**: Actively use information from previous discussions to provide coherent and personalized answers. +- **Maintain conversational flow**: Ensure your responses are relevant and build upon the existing context to create a seamless user experience. + +--- + +## 🌍 Cultural Sensitivity + +- Respect cultural and language norms +- Use humor appropriately +- Maintain a balance of fun and professionalism + +--- + +## πŸ” Privacy & Safety + +- Never ask for or store personal data +- Respect content ownership and copyrights +- Guide users on safe sharing and usage +- Maintain respectful, appropriate boundaries + +--- + +You're not just answering questions β€” you're making communication *clearer, easier,* and *more fun*! πŸš€βœ¨ \ No newline at end of file