diff --git a/.gitignore b/.gitignore index cad404f..5816b9c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,11 @@ # instagram_cookies.txt should not be tracked by git because it has cookies instagram_cookies.txt +# SQLite database +src/data/ +*.db +*.db-journal + # Byte-compiled / optimized / compiled Python files __pycache__/ *.py[cod] diff --git a/Dockerfile b/Dockerfile index 16493e3..bc4049f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,9 @@ COPY src /bot WORKDIR /bot +# Create data directory for SQLite database +RUN mkdir -p /bot/data + # https://stackoverflow.com/questions/58701233/docker-logs-erroneously-appears-empty-until-container-stops ENV PYTHONUNBUFFERED=1 diff --git a/README.md b/README.md index ca3a9eb..f430c61 100644 --- a/README.md +++ b/README.md @@ -27,13 +27,21 @@ docker build . -t downloader-bot:latest ``` docker run -d --name downloader-bot --restart always --env-file .env downloader-bot:latest ``` +To persist user data (conversation history, rate limits) between restarts, add a volume: +``` +docker run -d --name downloader-bot --restart always --env-file .env -v bot-data:/bot/data downloader-bot:latest +``` or use a built image from **Docker hub** ``` docker run -d --name downloader-bot --restart always --env-file .env ovchynnikov/load-bot-linux:latest ``` +With persistent data: +``` +docker run -d --name downloader-bot --restart always --env-file .env -v bot-data:/bot/data ovchynnikov/load-bot-linux:latest +``` or if you use instagram cookies ``` -docker run -d --name downloader-bot --restart always --env-file .env -v /absolute/path/to/instagram_cookies.txt:/bot/instagram_cookies.txt ovchynnikov/load-bot-linux:latest +docker run -d --name downloader-bot --restart always --env-file .env -v bot-data:/bot/data -v /absolute/path/to/instagram_cookies.txt:/bot/instagram_cookies.txt ovchynnikov/load-bot-linux:latest ``` or if you want use GPU power of intel chip and set USE_GPU_COMPRESSING=True variable ``` diff --git a/docker-compose.yml b/docker-compose.yml index d1270b4..a1de621 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,8 +10,13 @@ services: restart: unless-stopped volumes: - ./src:/app:cached # Use bind mount for development + - bot-data:/bot/data # Persistent storage for SQLite database deploy: resources: limits: cpus: '1' memory: 512M + +volumes: + bot-data: + driver: local diff --git a/src/db_storage.py b/src/db_storage.py new file mode 100644 index 0000000..2ecfdb4 --- /dev/null +++ b/src/db_storage.py @@ -0,0 +1,87 @@ +"""SQLite storage for bot user data persistence.""" + +import sqlite3 +import json +import os +import time +from logger import debug + + +class BotStorage: + """Handles persistent storage of user data in SQLite.""" + + def __init__(self, db_path="data/bot.db"): + """Initialize database connection and create tables.""" + os.makedirs(os.path.dirname(db_path), exist_ok=True) + self.db_path = db_path + self.conn = sqlite3.connect(db_path, check_same_thread=False) + self._create_tables() + debug("Database initialized at %s", db_path) + + def _create_tables(self): + """Create tables if they don't exist.""" + cursor = self.conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS user_data ( + user_id INTEGER PRIMARY KEY, + conversation_context TEXT, + rate_limit_timestamps TEXT, + daily_count INTEGER DEFAULT 0, + daily_date TEXT, + last_seen REAL + ) + """) + cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_data_last_seen ON user_data(last_seen)") + self.conn.commit() + + def load_user_data(self, user_id): + """Load user data from database.""" + cursor = self.conn.cursor() + cursor.execute("SELECT * FROM user_data WHERE user_id = ?", (user_id,)) + row = cursor.fetchone() + if row: + return { + "conversation_context": json.loads(row[1]) if row[1] else [], + "rate_limit_timestamps": json.loads(row[2]) if row[2] else [], + "daily_count": row[3], + "daily_date": row[4], + "last_seen": row[5], + } + return None + + def save_user_data(self, user_id, conversation_context, rate_limit_timestamps, daily_count, daily_date, last_seen): + """Save user data to database.""" + cursor = self.conn.cursor() + cursor.execute( + """ + INSERT OR REPLACE INTO user_data + (user_id, conversation_context, rate_limit_timestamps, daily_count, daily_date, last_seen) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + user_id, + json.dumps(conversation_context), + json.dumps(rate_limit_timestamps), + daily_count, + daily_date, + last_seen, + ), + ) + self.conn.commit() + + def delete_user_data(self, user_id): + """Delete user data from database.""" + cursor = self.conn.cursor() + cursor.execute("DELETE FROM user_data WHERE user_id = ?", (user_id,)) + self.conn.commit() + + def get_stale_users(self, ttl_seconds): + """Get list of user IDs that haven't been seen within TTL.""" + current_time = time.time() + cursor = self.conn.cursor() + cursor.execute("SELECT user_id FROM user_data WHERE last_seen < ?", (current_time - ttl_seconds,)) + return [row[0] for row in cursor.fetchall()] + + def close(self): + """Close database connection.""" + self.conn.close() diff --git a/src/main.py b/src/main.py index a9d5ae2..d3d070e 100644 --- a/src/main.py +++ b/src/main.py @@ -5,8 +5,13 @@ import json import asyncio import re +import time +import traceback +from datetime import datetime import google.generativeai as genai +from openai import AsyncOpenAI from functools import lru_cache +from collections import defaultdict from dotenv import load_dotenv from telegram import Update, InputMediaPhoto, InputMediaVideo from telegram.error import TimedOut, NetworkError, TelegramError @@ -16,6 +21,7 @@ from general_error_handler import error_handler from permissions import inform_user_not_allowed, is_user_or_chat_not_allowed, supported_sites from cleanup import cleanup +from db_storage import BotStorage from video_utils import ( compress_video, download_media, @@ -35,8 +41,12 @@ # Reply with user data for Healthcheck send_user_info_with_healthcheck = os.getenv("SEND_USER_INFO_WITH_HEALTHCHECK", "False").lower() == "true" USE_LLM = os.getenv("USE_LLM", "False").lower() == "true" +USE_CONVERSATION_CONTEXT = os.getenv("USE_CONVERSATION_CONTEXT", "True").lower() == "true" +LLM_PROVIDER = os.getenv("LLM_PROVIDER", "grok").lower() # gemini or grok GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") -GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") +GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-flash-latest") +GROK_API_KEY = os.getenv("GROK_API_KEY") +GROK_MODEL = os.getenv("GROK_MODEL", "grok-4-latest") TELEGRAM_WRITE_TIMEOUT = 8000 TELEGRAM_READ_TIMEOUT = 8000 @@ -44,6 +54,36 @@ if GEMINI_API_KEY: genai.configure(api_key=GEMINI_API_KEY) +# Configure Grok API +grok_client = None +if GROK_API_KEY: + grok_client = AsyncOpenAI(api_key=GROK_API_KEY, base_url="https://api.x.ai/v1") + +# Rate limiting for LLM APIs +llm_rate_limit = defaultdict(list) # {user_id: [timestamp1, timestamp2, ...]} +llm_daily_limit = defaultdict(lambda: {"count": 0, "date": ""}) # {user_id: {count, date}} +LLM_RPM_LIMIT = int(os.getenv("LLM_RPM_LIMIT", "50")) # Requests per minute per user +LLM_RPD_LIMIT = int(os.getenv("LLM_RPD_LIMIT", "500")) # Requests per day per user + +# Conversation context storage: {user_id: [(user_msg, bot_response), ...]} +conversation_context = defaultdict(list) +MAX_CONTEXT_MESSAGES = int(os.getenv("MAX_CONTEXT_MESSAGES", "3")) # Keep last N exchanges +MAX_CONTEXT_CHARS = int(os.getenv("MAX_CONTEXT_CHARS", "500")) # Max chars per message in context + +# User activity tracking for cleanup +user_last_seen = defaultdict(float) # {user_id: timestamp} +USER_CLEANUP_TTL_DAYS = int(os.getenv("USER_CLEANUP_TTL_DAYS", "3")) # Days before user data expires +USER_CLEANUP_INTERVAL_HOURS = int(os.getenv("USER_CLEANUP_INTERVAL_HOURS", "24")) # Cleanup interval + +# Allowed LLM providers +ALLOWED_PROVIDERS = {"grok", "gemini"} + +# Initialize database storage +db_storage = BotStorage() + +# Cleanup task reference +cleanup_task = None + # Cache responses from JSON file @lru_cache(maxsize=1) @@ -200,7 +240,7 @@ async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): # bot_mentioned = is_bot_mentioned(message_text) debug("Bot mentioned check: %s for message: %s", bot_mentioned, message_text) debug("USE_LLM setting: %s", USE_LLM) - debug("GEMINI_API_KEY configured: %s", bool(GEMINI_API_KEY)) + debug("LLM_PROVIDER: %s", LLM_PROVIDER) if bot_mentioned: if USE_LLM: @@ -470,7 +510,7 @@ async def send_pic(update: Update, pic) -> None: async def respond_with_llm_message(update): - """Handle LLM responses when bot is mentioned using Google Gemini API.""" + """Handle LLM responses when bot is mentioned using Gemini or Grok API.""" debug("LLM response function called") message_text = update.message.text # Remove bot mention and any punctuation after it @@ -478,11 +518,93 @@ async def respond_with_llm_message(update): debug("Original message: %s", message_text) debug("Processed prompt: %s", prompt) - if not GEMINI_API_KEY: - # debug("GEMINI_API_KEY not configured") - await update.message.reply_text("Sorry, AI service is not configured.") + # Validate LLM provider + if LLM_PROVIDER not in ALLOWED_PROVIDERS: + bot_response = ( + f"Вибачте, провайдер '{LLM_PROVIDER}' не підтримується. Доступні: {', '.join(ALLOWED_PROVIDERS)}" + if language == "uk" + else f"Sorry, provider '{LLM_PROVIDER}' is not supported. Available: {', '.join(ALLOWED_PROVIDERS)}" + ) + await update.message.reply_text(bot_response) + return + + # Check if API is configured + if LLM_PROVIDER == "grok" and not GROK_API_KEY: + bot_response = ( + "Вибачте, Grok AI сервіс не налаштовано." + if language == "uk" + else "Sorry, Grok AI service is not configured." + ) + await update.message.reply_text(bot_response) + return + elif LLM_PROVIDER == "gemini" and not GEMINI_API_KEY: + bot_response = ( + "Вибачте, Gemini AI сервіс не налаштовано." + if language == "uk" + else "Sorry, Gemini AI service is not configured." + ) + await update.message.reply_text(bot_response) + return + + # Rate limiting check + user_id = update.effective_user.id + current_time = time.time() + + # Update last seen timestamp + user_last_seen[user_id] = current_time + + # Load user data from database on first access + if user_id not in llm_daily_limit: + debug("Loading user data from database for user_id: %s", user_id) + user_data = await asyncio.to_thread(db_storage.load_user_data, user_id) + if user_data: + debug( + "Found user data in database: context=%d messages, rate_limit=%d timestamps, daily=%d/%s", + len(user_data["conversation_context"]), + len(user_data["rate_limit_timestamps"]), + user_data["daily_count"], + user_data["daily_date"], + ) + conversation_context[user_id] = user_data["conversation_context"] + llm_rate_limit[user_id] = user_data["rate_limit_timestamps"] + llm_daily_limit[user_id] = {"count": user_data["daily_count"], "date": user_data["daily_date"]} + # Only update last_seen if DB value is newer + if user_id not in user_last_seen or user_data["last_seen"] > user_last_seen[user_id]: + user_last_seen[user_id] = user_data["last_seen"] + else: + debug("No existing data found in database for user_id: %s", user_id) + + # Clean old timestamps (older than 60 seconds) + llm_rate_limit[user_id] = [t for t in llm_rate_limit[user_id] if current_time - t < 60] + + if len(llm_rate_limit[user_id]) >= LLM_RPM_LIMIT: + debug("Rate limit hit for user %s", user_id) + bot_response = ( + "Вибачте, забагато запитів. Почекайте хвилину." + if language == "uk" + else "Sorry, too many requests. Please wait a minute." + ) + await update.message.reply_text(bot_response) + return + + # Check daily limit + today = datetime.now().strftime("%Y-%m-%d") + if llm_daily_limit[user_id]["date"] != today: + llm_daily_limit[user_id] = {"count": 0, "date": today} + + if llm_daily_limit[user_id]["count"] >= LLM_RPD_LIMIT: + debug("Daily limit hit for user %s", user_id) + bot_response = ( + "Вибачте, денний ліміт запитів вичерпано. Спробуйте завтра." + if language == "uk" + else "Sorry, daily request limit reached. Try again tomorrow." + ) + await update.message.reply_text(bot_response) return + # Tentatively add current request timestamp (will be removed on failure) + llm_rate_limit[user_id].append(current_time) + try: # Check if user is asking for image generation and modify prompt image_keywords = [ @@ -513,107 +635,289 @@ async def respond_with_llm_message(update): bot_response = "Sorry, I can't generate images, but I can describe in detail what you're asking for! For example, I can tell you about a car: its color, shape, design features, etc. What specifically interests you?" await update.message.reply_text(bot_response) + # Remove tentative timestamp since no API call was made + llm_rate_limit[user_id].pop() return - # Initialize the Gemini model - debug("Initializing Gemini model: gemini-2.5-flash") - plain_text_instruction = "Provide the entire response exclusively as plain text. Do not use any Markdown formatting (no **bold**, *italics*, # headers, or lists). The response must be text only. Provide concise, short answers. Aim for 1-3 sentences." + # Prepare prompt with context + debug("Original prompt: %s", prompt) - model = genai.GenerativeModel(GEMINI_MODEL, system_instruction=plain_text_instruction) + # Build context from previous messages if enabled + if USE_CONVERSATION_CONTEXT: + context_messages = ( + conversation_context[user_id][-MAX_CONTEXT_MESSAGES:] if conversation_context[user_id] else [] + ) + else: + context_messages = [] - # Try different approach - rephrase any potentially problematic prompts - debug("Original prompt: %s", prompt) - safe_prompt = f"Відповідай українською мовою як дружній асистент. Питання користувача: {prompt}" - debug("Modified safe prompt: %s", safe_prompt) - - # Generate response using Gemini with both safety settings and safe prompting - debug("Sending request to Gemini API") - safety_settings = { - genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, - genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, - genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE, - genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, - } - contents = [{'role': 'user', 'parts': [safe_prompt]}] - response = await asyncio.to_thread( - model.generate_content, - contents, # Pass the simplified list here - generation_config=genai.types.GenerationConfig( - temperature=0.7, - top_p=0.9, - top_k=30, - max_output_tokens=1024, - ), - safety_settings=safety_settings, - ) - # debug("Successfully received response from Gemini API") - - # Handle response with safety filter checks - if hasattr(response, 'candidates') and response.candidates: - candidate = response.candidates[0] - debug("Response candidate finish_reason: %s", getattr(candidate, 'finish_reason', 'None')) - debug("Response candidate safety_ratings: %s", getattr(candidate, 'safety_ratings', 'None')) - - if hasattr(candidate, 'finish_reason') and candidate.finish_reason == 2: - debug("Safety filter triggered - finish_reason: 2, trying simpler approach") - # Try a much simpler, generic response for blocked content - try: - simple_response = await asyncio.to_thread( - model.generate_content, - "Відповідь українською мовою: дай загальну інформацію про: " + prompt, - safety_settings=safety_settings, - ) - if simple_response.text: - bot_response = f"Ось загальна інформація: {simple_response.text.strip()}" - else: - bot_response = ( - "Вибачте, не можу надати детальну відповідь на це питання." - if language == "uk" - else "Sorry, I can't provide a detailed answer to this question." - ) - except: # --- IGNORE --- # pylint: disable=bare-except - bot_response = ( - "Вибачте, не можу надати детальну відповідь на це питання." - if language == "uk" - else "Sorry, I can't provide a detailed answer to this question." - ) - elif response.text: - # Remove Markdown formatting from response - bot_response = response.text.strip() - # Remove common Markdown syntax - bot_response = re.sub(r'\*+', '', bot_response) # Bold text - bot_response = bot_response.replace('*', '') # Italic text - bot_response = bot_response.replace('`', '') # Code blocks - bot_response = bot_response.replace('#', '') # Headers + # Create prompt with context if available + if language == "uk": + user_label = "Користувач" + assistant_label = "Асистент" + instruction = "Відповідай українською мовою як дружній асистент. Не вітайся і не прощайся." + else: + user_label = "User" + assistant_label = "Assistant" + instruction = "Answer in English as a friendly assistant. Don't greet or say goodbye." + + if context_messages: + context_str = "\n".join( + [f"{user_label}: {msg}\n{assistant_label}: {resp}" for msg, resp in context_messages] + ) + if language == "uk": + safe_prompt = ( + f"Попередня розмова:\n{context_str}\n\nПоточне питання користувача: {prompt}\n\n{instruction}" + ) else: - bot_response = ( - "Вибачте, я не можу згенерувати відповідь." - if language == "uk" - else "Sorry, I couldn't generate a response." + safe_prompt = ( + f"Previous conversation:\n{context_str}\n\nCurrent user question: {prompt}\n\n{instruction}" + ) + else: + if language == "uk": + safe_prompt = f"{instruction} Питання користувача: {prompt}" + else: + safe_prompt = f"{instruction} User question: {prompt}" + + debug("Modified safe prompt with context: %s", safe_prompt[:200]) + + # Call appropriate LLM provider + if LLM_PROVIDER == "grok": + debug("Using Grok API with model: %s", GROK_MODEL) + bot_response = await call_grok_api(safe_prompt, update) + else: + debug("Using Gemini API with model: %s", GEMINI_MODEL) + bot_response = await call_gemini_api(safe_prompt, prompt, update) + + # Increment daily limit only after successful API call + llm_daily_limit[user_id]["count"] += 1 + + # Store conversation in context if enabled + if USE_CONVERSATION_CONTEXT: + truncated_prompt = prompt[:MAX_CONTEXT_CHARS] + truncated_response = bot_response[:MAX_CONTEXT_CHARS] + conversation_context[user_id].append((truncated_prompt, truncated_response)) + # Keep only last MAX_CONTEXT_MESSAGES + if len(conversation_context[user_id]) > MAX_CONTEXT_MESSAGES: + conversation_context[user_id] = conversation_context[user_id][-MAX_CONTEXT_MESSAGES:] + + # Send reply first, then save to DB (best-effort persistence) + await update.message.reply_text(bot_response) + + # Save user data to database (best-effort, don't fail on DB errors) + async def save_to_db(): + try: + debug( + "Saving user data to database: user_id=%s, context=%d messages, daily=%d/%s", + user_id, + len(conversation_context[user_id]), + llm_daily_limit[user_id]["count"], + llm_daily_limit[user_id]["date"], + ) + await asyncio.to_thread( + db_storage.save_user_data, + user_id, + conversation_context[user_id], + llm_rate_limit[user_id], + llm_daily_limit[user_id]["count"], + llm_daily_limit[user_id]["date"], + user_last_seen[user_id], ) + except Exception as db_error: # pylint: disable=broad-except + error("Failed to save user data to database: %s", db_error) + + asyncio.create_task(save_to_db()) + + except Exception as e: # pylint: disable=broad-except + # Remove tentative timestamp on failure + if llm_rate_limit[user_id] and llm_rate_limit[user_id][-1] == current_time: + llm_rate_limit[user_id].pop() + + error_msg = str(e) + error("Error in LLM API request: %s (Type: %s)", error_msg, type(e).__name__) + error("Full traceback: %s", traceback.format_exc()) + + # Check for rate limit (429) error + if "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower(): + error("Rate limit exceeded (429) - Too many requests to LLM API") + bot_response = ( + "Вибачте, перевищено ліміт запитів до AI. Спробуйте пізніше." + if language == "uk" + else "Sorry, AI request limit exceeded. Please try again later." + ) else: bot_response = ( "Вибачте, я не можу згенерувати відповідь." if language == "uk" - else "Sorry, I couldn't generate a response." + else "Sorry, I encountered an error while processing your request." ) await update.message.reply_text(bot_response) - except (ValueError, RuntimeError) as e: - error("Error in Gemini API request: %s", e) - await update.message.reply_text( - "Вибачте, я не можу згенерувати відповідь." - if language == "uk" - else "Sorry, I encountered an error while processing your request." - ) - except Exception as e: # pylint: disable=broad-except - error("Unexpected error in Gemini API request: %s", e) - await update.message.reply_text( - "Вибачте, я не можу згенерувати відповідь." - if language == "uk" - else "Sorry, I encountered an unexpected error while processing your request." - ) + +async def call_grok_api(safe_prompt: str, update) -> str: + """Call Grok API and return response. Raises exception on failure.""" + plain_text_instruction = "Provide the entire response exclusively as plain text. Do not use any Markdown formatting (no **bold**, *italics*, # headers, or lists). The response must be text only. Provide concise, short answers. Aim for 1-3 sentences." + max_retries = 2 + retry_delay = 60 + + for attempt in range(max_retries): + try: + response = await grok_client.chat.completions.create( + model=GROK_MODEL, + messages=[ + {"role": "system", "content": plain_text_instruction}, + {"role": "user", "content": safe_prompt}, + ], + max_tokens=1024, + temperature=0.7, + ) + return response.choices[0].message.content.strip() + except Exception as retry_error: # pylint: disable=broad-exception-caught + error_msg = str(retry_error) + if ( + "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower() + ) and attempt < max_retries - 1: + debug( + "Rate limit hit, waiting %s seconds before retry (attempt %s/%s)", + retry_delay, + attempt + 1, + max_retries, + ) + wait_msg = ( + f"Перевищено ліміт запитів. Зачекайте {retry_delay} секунд, я спробую ще раз..." + if language == "uk" + else f"Rate limit exceeded. Waiting {retry_delay} seconds before retrying..." + ) + await update.message.reply_text(wait_msg) + await asyncio.sleep(retry_delay) + else: + raise + + +async def call_gemini_api(safe_prompt: str, prompt: str, update) -> str: + """Call Gemini API and return response. Raises exception on failure.""" + plain_text_instruction = "Provide the entire response exclusively as plain text. Do not use any Markdown formatting (no **bold**, *italics*, # headers, or lists). The response must be text only. Provide concise, short answers. Aim for 1-3 sentences." + model = genai.GenerativeModel(GEMINI_MODEL, system_instruction=plain_text_instruction) + safety_settings = { + genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_NONE, + genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_NONE, + genai.types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: genai.types.HarmBlockThreshold.BLOCK_NONE, + genai.types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: genai.types.HarmBlockThreshold.BLOCK_NONE, + } + contents = [{'role': 'user', 'parts': [safe_prompt]}] + + max_retries = 2 + retry_delay = 60 + response = None + + for attempt in range(max_retries): + try: + response = await asyncio.to_thread( + model.generate_content, + contents, + generation_config=genai.types.GenerationConfig( + temperature=0.7, + top_p=0.9, + top_k=30, + max_output_tokens=1024, + ), + safety_settings=safety_settings, + ) + debug("Successfully received response from Gemini API") + break + except Exception as retry_error: # pylint: disable=broad-exception-caught + error_msg = str(retry_error) + if ( + "429" in error_msg or "quota" in error_msg.lower() or "rate limit" in error_msg.lower() + ) and attempt < max_retries - 1: + debug( + "Rate limit hit, waiting %s seconds before retry (attempt %s/%s)", + retry_delay, + attempt + 1, + max_retries, + ) + wait_msg = ( + f"Перевищено ліміт запитів. Зачекайте {retry_delay} секунд, я спробую ще раз..." + if language == "uk" + else f"Rate limit exceeded. Waiting {retry_delay} seconds before retrying..." + ) + await update.message.reply_text(wait_msg) + await asyncio.sleep(retry_delay) + else: + raise + + # Check if response was set after retries + if response is None: + raise Exception("Failed to get response after retries") # pylint: disable=broad-exception-raised + + if hasattr(response, 'candidates') and response.candidates: + candidate = response.candidates[0] + debug("Response candidate finish_reason: %s", getattr(candidate, 'finish_reason', 'None')) + debug("Response candidate safety_ratings: %s", getattr(candidate, 'safety_ratings', 'None')) + + if hasattr(candidate, 'finish_reason') and candidate.finish_reason == 2: + debug("Safety filter triggered - finish_reason: 2, trying simpler approach") + fallback_instruction = ( + "Відповідь українською мовою: дай загальну інформацію про: " + if language == "uk" + else "Answer in English: give general information about: " + ) + simple_response = await asyncio.to_thread( + model.generate_content, + fallback_instruction + prompt, + safety_settings=safety_settings, + ) + if simple_response.text: + prefix = "Ось загальна інформація: " if language == "uk" else "Here's general information: " + return f"{prefix}{simple_response.text.strip()}" + else: + error_msg = ( + "Вибачте, не можу надати детальну відповідь на це питання." + if language == "uk" + else "Sorry, I can't provide a detailed answer to this question." + ) + raise Exception(error_msg) # pylint: disable=broad-exception-raised + elif response.text: + # Remove Markdown formatting + bot_response = response.text.strip() + bot_response = re.sub(r'\*+', '', bot_response) + bot_response = bot_response.replace('*', '').replace('`', '').replace('#', '') + return bot_response + else: + raise Exception("Вибачте, я не можу згенерувати відповідь.") # pylint: disable=broad-exception-raised + else: + raise Exception("Вибачте, я не можу згенерувати відповідь.") # pylint: disable=broad-exception-raised + + +async def cleanup_stale_users(): + """Remove inactive users from memory and database to prevent unbounded growth.""" + while True: + try: + await asyncio.sleep(USER_CLEANUP_INTERVAL_HOURS * 3600) + ttl_seconds = USER_CLEANUP_TTL_DAYS * 86400 + + # Get stale users from database + stale_users = await asyncio.to_thread(db_storage.get_stale_users, ttl_seconds) + + for user_id in stale_users: + # Remove from memory + if user_id in conversation_context: + del conversation_context[user_id] + if user_id in llm_rate_limit: + del llm_rate_limit[user_id] + if user_id in llm_daily_limit: + del llm_daily_limit[user_id] + if user_id in user_last_seen: + del user_last_seen[user_id] + # Remove from database + await asyncio.to_thread(db_storage.delete_user_data, user_id) + + if stale_users: + info("Cleaned up %d inactive users (TTL: %d days)", len(stale_users), USER_CLEANUP_TTL_DAYS) + except Exception as cleanup_error: # pylint: disable=broad-except + error("Error in cleanup_stale_users: %s", cleanup_error) + error("Full traceback: %s", traceback.format_exc()) + await asyncio.sleep(60) # Wait before retrying def main(): @@ -647,6 +951,30 @@ def main(): application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) # This handler will receive every error which happens in your bot application.add_error_handler(error_handler) + + # Start cleanup task after event loop is running + async def post_init(app): # pylint: disable=unused-argument + global cleanup_task # pylint: disable=global-statement + cleanup_task = asyncio.create_task(cleanup_stale_users()) + + # Cancel cleanup task and close DB on shutdown + async def post_shutdown(app): # pylint: disable=unused-argument + if cleanup_task is not None: + cleanup_task.cancel() + try: + await cleanup_task + except asyncio.CancelledError: + pass + # Close database connection + try: + db_storage.close() + debug("Database connection closed") + except Exception as e: # pylint: disable=broad-except + error("Error closing database: %s", e) + + application.post_init = post_init + application.post_shutdown = post_shutdown + info("Bot started. Ctrl+C to stop") application.run_polling() diff --git a/src/requirements.txt b/src/requirements.txt index f630e1a..244b45a 100644 --- a/src/requirements.txt +++ b/src/requirements.txt @@ -1,6 +1,7 @@ -python-telegram-bot[ext]==22.6 -python-dotenv==1.2.1 -yt-dlp==2026.2.21 -gallery-dl==1.31.6 -aiohttp==3.13.3 +python-telegram-bot[ext]>=22.6 +python-dotenv>=1.2.2 +yt-dlp>=2026.3.3 +gallery-dl>=1.31.7 +aiohttp>=3.13.3 google-generativeai>=0.8.6 +openai>=2.24.0