Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions .github/workflows/docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,29 @@ name: Docker Build and Push

on:
push:
branches: [ main ]
branches: [main]
paths-ignore:
- '**.md'
- 'docs/**'
- 'README.md'
- '.gitignore'
- '.env.example'
- "**.md"
- "docs/**"
- "README.md"
- ".gitignore"
- ".env.example"
pull_request:
branches: [ main ]
types: [closed]
branches: [main]
paths-ignore:
- '**.md'
- 'docs/**'
- 'README.md'
- '.gitignore'
- '.env.example'
- "**.md"
- "docs/**"
- "README.md"
- ".gitignore"
- ".env.example"

env:
DOCKER_IMAGE: ${{ secrets.DOCKERHUB_USERNAME }}/Wizard-Agent

jobs:
build-and-push:
if: github.event.pull_request.merged == true && github.event.pull_request.base.ref == 'main'
runs-on: ubuntu-latest
permissions:
contents: read
Expand All @@ -43,7 +45,7 @@ jobs:
uses: actions/cache@v3
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-${{ github.sha }}
key: ${{ runner.os }}-buildx-${{ hashFiles('Dockerfile', 'requirements.txt') }}
restore-keys: |
${{ runner.os }}-buildx-

Expand Down Expand Up @@ -98,4 +100,4 @@ jobs:
- name: Move cache
run: |
rm -rf /tmp/.buildx-cache
mv /tmp/.buildx-cache-new /tmp/.buildx-cache
mv /tmp/.buildx-cache-new /tmp/.buildx-cache
259 changes: 198 additions & 61 deletions database.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,239 @@
from typing import List, Dict, Optional
import logging
from sqlalchemy import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timedelta
from database_config import async_session, Conversation, UserContext, Base, engine
from typing import Dict, List, Optional, Tuple

from sklearn.metrics.pairwise import cosine_similarity
from sqlalchemy import and_, delete, select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

from database_config import (DATABASE_URL, Base, Conversation, User,
UserContext, UserMemory)

logger = logging.getLogger(__name__)

class ConversationDB:
"""Database operations for conversation management"""

def __init__(self):
"""Initialize database connection"""
self.engine = create_async_engine(
DATABASE_URL,
echo=True,
future=True
)
self.async_session = sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False
)

async def init_db(self):
"""Initialize the database with required tables"""
try:
async with engine.begin() as conn:
# Drop all tables first (optional, comment out if you want to keep existing data)
# await conn.run_sync(Base.metadata.drop_all)
# Create all tables
await conn.run_sync(Base.metadata.create_all)
logger.info("Database tables created successfully")
except Exception as e:
logger.error(f"Error creating database tables: {str(e)}")
raise
"""Initialize database tables"""
async with self.engine.begin() as conn:
# await conn.run_sync(Base.metadata.drop_all) # Commented out to prevent data loss during development
await conn.run_sync(Base.metadata.create_all)

async def get_or_create_user(self, user_id: str, session: Optional[AsyncSession] = None) -> None:
"""Get or create a user record"""
if session is None:
async with self.async_session() as new_session:
await self._get_or_create_user_in_session(new_session, user_id)
else:
await self._get_or_create_user_in_session(session, user_id)

async def add_conversation(self, user_id: str, message: str, response: str, language: str, metadata: Dict = None):
"""Add a new conversation entry"""
async def _get_or_create_user_in_session(self, session: AsyncSession, user_id: str) -> None:
"""Helper to get or create user within a given session"""
query = select(User).where(User.id == user_id)
result = await session.execute(query)
user = result.scalar_one_or_none()

if not user:
user = User(id=user_id)
session.add(user)
await session.commit()

async def add_conversation(self, user_id: str, message: str, response: str,
language: str, embedding: List[float] = None,
metadata: Dict = None, topic: str = None, num_tokens: int = None) -> None:
"""Add a new conversation entry with enhanced context"""
try:
async with async_session() as session:
async with self.async_session() as session:
# Ensure user exists
await self.get_or_create_user(user_id, session)

conversation = Conversation(
user_id=user_id,
message=message,
response=response,
language=language,
message_metadata=metadata
embedding=embedding,
message_metadata=metadata,
topic=topic,
num_tokens=num_tokens
)
session.add(conversation)
await session.commit()
except Exception as e:
logger.error(f"Error adding conversation: {str(e)}")
raise

async def get_recent_conversations(self, user_id: str, limit: int = 10) -> List[Dict]:
"""Get recent conversations for a user"""
async def check_repetition(self, user_id: str, new_message_embedding: List[float],
threshold: float = 0.8) -> Tuple[bool, Optional[Dict]]:
"""Check if the message is similar to previous conversations"""
try:
async with async_session() as session:
async with self.async_session() as session:
# Get recent conversations
query = select(Conversation).where(
Conversation.user_id == user_id
).order_by(
Conversation.timestamp.desc()
).limit(limit)
).order_by(Conversation.timestamp.desc()).limit(10)

result = await session.execute(query)
conversations = result.scalars().all()
recent_convs = result.scalars().all()

return [{
'message': conv.message,
'response': conv.response,
'language': conv.language,
'timestamp': conv.timestamp,
'metadata': conv.message_metadata
} for conv in conversations]
if not recent_convs:
return False, None

# Compare embeddings
for conv in recent_convs:
if conv.embedding:
similarity = cosine_similarity(
[new_message_embedding],
[conv.embedding]
)[0][0]

if similarity > threshold:
return True, conv.to_dict()

return False, None
except Exception as e:
logger.error(f"Error getting conversations: {str(e)}")
logger.error(f"Error checking repetition: {str(e)}")
return False, None

async def get_recent_conversations(self, user_id: str, limit: int = 10) -> List[Dict]:
"""Get recent conversations for a user"""
async with self.async_session() as session:
query = select(Conversation).where(
Conversation.user_id == user_id
).order_by(Conversation.timestamp.desc()).limit(limit)

result = await session.execute(query)
conversations = result.scalars().all()

return [conv.to_dict() for conv in conversations]

async def _get_or_create_user_context(self, session: AsyncSession, user_id: str) -> UserContext:
"""Get or create user context"""
query = select(UserContext).where(UserContext.user_id == user_id)
result = await session.execute(query)
context = result.scalar_one_or_none()

if not context:
context = UserContext(user_id=user_id)
session.add(context)
await session.commit()

return context

async def get_user_context(self, user_id: str) -> List[Dict]:
"""Get the context messages for a user"""
try:
async with self.async_session() as session:
context = await self._get_or_create_user_context(session, user_id)
return context.context_messages or []
except Exception as e:
logger.error(f"Error getting user context: {str(e)}")
return []

async def get_context_summary(self, user_id: str) -> Optional[str]:
"""Get the context summary for a user"""
async def update_user_context(self, user_id: str, role: str, content: str) -> None:
"""Add a message to the user's context"""
try:
async with self.async_session() as session:
context = await self._get_or_create_user_context(session, user_id)
context.add_message(role, content)
await session.commit()
except Exception as e:
logger.error(f"Error updating user context: {str(e)}")

async def get_user_preferences(self, user_id: str) -> Dict:
"""Get user preferences and settings"""
try:
async with async_session() as session:
query = select(UserContext).where(UserContext.user_id == user_id)
async with self.async_session() as session:
context = await self._get_or_create_user_context(session, user_id)
return {
'preferred_language': context.preferred_language,
'conversation_topics': context.conversation_topics or [],
'user_preferences': context.user_preferences or {},
'context_window': context.context_window,
'repetition_threshold': context.repetition_threshold
}
except Exception as e:
logger.error(f"Error getting user preferences: {str(e)}")
return {}

async def get_relevant_memories(self, user_id: str, query_embedding: List[float],
limit: int = 5) -> List[Dict]:
"""Get memories relevant to the current context"""
try:
async with self.async_session() as session:
query = select(UserMemory).where(
and_(
UserMemory.user_id == user_id,
UserMemory.is_active == True
)
)
result = await session.execute(query)
context = result.scalar_one_or_none()
return context.context_summary if context else None
memories = result.scalars().all()

# Calculate similarity scores
memory_scores = []
for memory in memories:
if memory.embedding and len(memory.embedding) == len(query_embedding):
try:
similarity = cosine_similarity(
[query_embedding],
[memory.embedding]
)[0][0]
memory_scores.append((memory, similarity))
except Exception as e:
logger.warning(f"Error calculating similarity for memory {memory.id}: {str(e)}")
continue

# Sort by similarity and importance
memory_scores.sort(key=lambda x: (x[1] * x[0].importance), reverse=True)

return [{
'content': memory.content,
'type': memory.memory_type,
'importance': memory.importance,
'similarity': score
} for memory, score in memory_scores[:limit]]
except Exception as e:
logger.error(f"Error getting context summary: {str(e)}")
return None
logger.error(f"Error getting relevant memories: {str(e)}")
return []

async def update_context_summary(self, user_id: str, summary: str):
"""Update the context summary for a user"""
async def add_memory(self, user_id: str, memory_type: str, content: str,
importance: float = 1.0, metadata: Dict = None,
embedding: List[float] = None) -> None:
"""Add a new memory for the user"""
try:
async with async_session() as session:
context = UserContext(
async with self.async_session() as session:
# Ensure user exists
await self.get_or_create_user(user_id, session)

memory = UserMemory(
user_id=user_id,
context_summary=summary
memory_type=memory_type,
content=content,
importance=importance,
memory_metadata=metadata,
embedding=embedding
)
await session.merge(context)
session.add(memory)
await session.commit()
except Exception as e:
logger.error(f"Error updating context summary: {str(e)}")
logger.error(f"Error adding memory: {str(e)}")
raise

async def cleanup_old_conversations(self, days: int = 30):
"""Clean up conversations older than specified days"""
try:
async with async_session() as session:
cutoff_date = datetime.utcnow() - timedelta(days=days)
query = delete(Conversation).where(Conversation.timestamp < cutoff_date)
await session.execute(query)
await session.commit()
except Exception as e:
logger.error(f"Error cleaning up old conversations: {str(e)}")
raise
# Initialize database
db = ConversationDB()
Loading