diff --git a/.env.example b/.env.example index dfd70908..cabf2908 100644 --- a/.env.example +++ b/.env.example @@ -140,6 +140,32 @@ QUICK_ACTIONS_TIMEOUT=120 # Git operations timeout in seconds GIT_OPERATIONS_TIMEOUT=30 +# === PER-CHAT ROUTING === +# Route specific Telegram chats to dedicated Claude sessions and working directories. +# This allows, for example, a personal DM to work in your home directory while a +# shared group chat works in a project-specific folder โ€” each with its own session. + +# Telegram chat ID of your personal DM with the bot +# Find your ID by messaging @userinfobot +PERSONAL_CHAT_ID= + +# Working directory for the personal DM session +PERSONAL_CHAT_DIRECTORY= + +# Telegram chat ID of a group chat (negative number, e.g. -1001234567890) +GROUP_CHAT_ID= + +# Working directory for the group chat session +GROUP_CHAT_DIRECTORY= + +# Prefix word required to trigger Claude in group chats. +# Messages NOT starting with this prefix are silently buffered for context +# but do not produce a response. Supports both "claude " and "/claude ". +# Buffered history is stored in Telegram ``chat_data`` and only survives bot +# restarts when a python-telegram-bot Persistence backend is configured. +# Default: claude +GROUP_TRIGGER_PREFIX=claude + # === PROJECT THREAD MODE === # Enable strict routing by Telegram project topics ENABLE_PROJECT_THREADS=false diff --git a/src/bot/features/chat_routing.py b/src/bot/features/chat_routing.py new file mode 100644 index 00000000..b95f79f3 --- /dev/null +++ b/src/bot/features/chat_routing.py @@ -0,0 +1,111 @@ +"""Per-chat routing: working directories and group conversation helpers. + +This module enables two related features: + +**Per-chat working directories** +Map specific Telegram chat IDs to dedicated Claude sessions and working +directories via ``PERSONAL_CHAT_ID`` / ``PERSONAL_CHAT_DIRECTORY`` and +``GROUP_CHAT_ID`` / ``GROUP_CHAT_DIRECTORY`` in your ``.env``. Messages from +an unrecognised chat fall back to ``APPROVED_DIRECTORY``. + +**Group trigger prefix + conversation history** +In group chats Claude is silent by default. Only messages that begin with +the configured prefix (default: ``claude``) โ€” or the equivalent slash command +(``/claude``) โ€” trigger a response. All other messages are stored in a +per-chat rolling buffer so that when Claude *is* triggered it receives the +recent conversation as context, allowing it to answer questions about what +was discussed without every participant having to @-mention the bot. +""" + +from pathlib import Path + +from src.config.settings import Settings + +# Maximum number of messages kept in the per-chat history buffer. +MAX_BUFFER_SIZE = 30 + +# How many buffered messages are prepended as context when Claude is triggered. +HISTORY_CONTEXT_SIZE = 20 + + +def append_to_buffer( + buffer: list[dict[str, str]], sender_name: str, text: str +) -> None: + """Append a message and trim the buffer to :data:`MAX_BUFFER_SIZE`.""" + buffer.append({"sender": sender_name, "text": text}) + if len(buffer) > MAX_BUFFER_SIZE: + del buffer[: len(buffer) - MAX_BUFFER_SIZE] + + +def format_history(messages: list[dict[str, str]]) -> str: + """Return messages formatted as ``Sender: text`` lines.""" + return "\n".join(f"{msg['sender']}: {msg['text']}" for msg in messages) + + +def is_group_triggered(message_text: str, trigger_prefix: str) -> bool: + """Return whether a group message should trigger Claude.""" + lower_text = message_text.lower() + lower_prefix = trigger_prefix.lower() + slash_prefix = f"/{lower_prefix}" + slash_variants = (f"{slash_prefix} ", f"{slash_prefix}@") + return ( + lower_text == lower_prefix + or lower_text.startswith(f"{lower_prefix} ") + or lower_text == slash_prefix + or lower_text.startswith(slash_variants) + ) + + +def strip_group_trigger_prefix(message_text: str, trigger_prefix: str) -> str: + """Remove the plain/slash trigger prefix, including ``@botname`` variants.""" + lower_text = message_text.lower() + lower_prefix = trigger_prefix.lower() + slash_prefix = f"/{lower_prefix}" + if lower_text == lower_prefix: + return "" + if lower_text.startswith(f"{lower_prefix} "): + return message_text[len(trigger_prefix) :].lstrip() + if lower_text == slash_prefix: + return "" + if lower_text.startswith(f"{slash_prefix} "): + return message_text[len(slash_prefix) :].lstrip() + if lower_text.startswith(f"{slash_prefix}@"): + parts = message_text.split(maxsplit=1) + return parts[1] if len(parts) > 1 else "" + return message_text + + +def build_group_prompt( + history: list[dict[str, str]], message_text: str, trigger_prefix: str +) -> str: + """Build the Claude prompt for a triggered group message.""" + stripped = strip_group_trigger_prefix(message_text, trigger_prefix) + context_messages = history[-HISTORY_CONTEXT_SIZE:] + if not context_messages: + return stripped + + history_str = format_history(context_messages) + return f"[Recent group conversation:\n{history_str}\n]\n\n{stripped}" + + +def get_working_directory(chat_id: int, settings: Settings) -> Path: + """Return the working directory to use for *chat_id*. + + Priority: + 1. ``personal_chat_directory`` if ``chat_id == personal_chat_id`` + 2. ``group_chat_directory`` if ``chat_id == group_chat_id`` + 3. ``approved_directory`` (the global fallback) + """ + if ( + settings.personal_chat_id is not None + and chat_id == settings.personal_chat_id + and settings.personal_chat_directory is not None + ): + return settings.personal_chat_directory + if ( + settings.group_chat_id is not None + and chat_id == settings.group_chat_id + and settings.group_chat_directory is not None + ): + return settings.group_chat_directory + return settings.approved_directory diff --git a/src/bot/orchestrator.py b/src/bot/orchestrator.py index 1124d006..7ce8082e 100644 --- a/src/bot/orchestrator.py +++ b/src/bot/orchestrator.py @@ -10,11 +10,12 @@ import time from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional, cast import structlog from telegram import ( BotCommand, + Chat, InlineKeyboardButton, InlineKeyboardMarkup, InputMediaPhoto, @@ -32,6 +33,12 @@ from ..claude.sdk_integration import StreamUpdate from ..config.settings import Settings from ..projects import PrivateTopicsUnavailableError +from .features.chat_routing import ( + append_to_buffer, + build_group_prompt, + get_working_directory, + is_group_triggered, +) from .utils.draft_streamer import DraftStreamer, generate_draft_id from .utils.html_format import escape_html from .utils.image_extractor import ( @@ -267,6 +274,10 @@ def _is_within(path: Path, root: Path) -> bool: except ValueError: return False + def _get_working_directory(self, chat_id: int) -> Path: + """Return working directory based on per-chat routing config.""" + return get_working_directory(chat_id, self.settings) + @staticmethod def _extract_message_thread_id(update: Update) -> Optional[int]: """Extract topic/thread id from update message for forum/direct topics.""" @@ -520,8 +531,11 @@ async def agentic_start( return except Exception: sync_line = "\n\n๐Ÿงต Topic sync failed. Run /sync_threads to retry." - current_dir = context.user_data.get( - "current_directory", self.settings.approved_directory + chat_id = update.effective_chat.id if update.effective_chat else None + current_dir = ( + self._get_working_directory(chat_id) + if chat_id + else self.settings.approved_directory ) dir_display = f"{current_dir}/" @@ -539,9 +553,9 @@ async def agentic_new( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Reset session, one-line confirmation.""" - context.user_data["claude_session_id"] = None - context.user_data["session_started"] = True - context.user_data["force_new_session"] = True + context.chat_data["claude_session_id"] = None + context.chat_data["session_started"] = True + context.chat_data["force_new_session"] = True await update.message.reply_text("Session reset. What's next?") @@ -549,12 +563,11 @@ async def agentic_status( self, update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: """Compact one-line status, no buttons.""" - current_dir = context.user_data.get( - "current_directory", self.settings.approved_directory - ) + chat_id = update.effective_chat.id + current_dir = self._get_working_directory(chat_id) dir_display = str(current_dir) - session_id = context.user_data.get("claude_session_id") + session_id = context.chat_data.get("claude_session_id") session_status = "active" if session_id else "none" # Cost info @@ -912,6 +925,24 @@ async def agentic_text( user_id = update.effective_user.id message_text = update.message.text + # Group chat: store message in history buffer; only respond if prefixed + chat_type = update.effective_chat.type if update.effective_chat else None + if chat_type in (Chat.GROUP, Chat.SUPERGROUP): + buffer = cast( + list[dict[str, str]], context.chat_data.setdefault("_msg_buffer", []) + ) + sender_name = ( + update.effective_user.first_name if update.effective_user else None + ) + append_to_buffer(buffer, sender_name or "Unknown", message_text) + prefix = self.settings.group_trigger_prefix + if not is_group_triggered(message_text, prefix): + # Not triggered โ€” store only, no response + return + + history = buffer[:-1] # All messages except the one just added + message_text = build_group_prompt(history, message_text, prefix) + logger.info( "Agentic text message", user_id=user_id, @@ -957,14 +988,13 @@ async def agentic_text( ) return - current_dir = context.user_data.get( - "current_directory", self.settings.approved_directory - ) - session_id = context.user_data.get("claude_session_id") + chat_id = update.effective_chat.id + current_dir = self._get_working_directory(chat_id) + session_id = context.chat_data.get("claude_session_id") # Check if /new was used โ€” skip auto-resume for this first message. # Flag is only cleared after a successful run so retries keep the intent. - force_new = bool(context.user_data.get("force_new_session")) + force_new = bool(context.chat_data.get("force_new_session")) # --- Verbose progress tracking via stream callback --- tool_log: List[Dict[str, Any]] = [] @@ -1011,9 +1041,9 @@ async def agentic_text( # New session created successfully โ€” clear the one-shot flag if force_new: - context.user_data["force_new_session"] = False + context.chat_data["force_new_session"] = False - context.user_data["claude_session_id"] = claude_response.session_id + context.chat_data["claude_session_id"] = claude_response.session_id # Track directory changes from .handlers.message import _update_working_directory_from_claude_response @@ -1227,14 +1257,13 @@ async def agentic_document( ) return - current_dir = context.user_data.get( - "current_directory", self.settings.approved_directory - ) - session_id = context.user_data.get("claude_session_id") + chat_id = update.effective_chat.id + current_dir = self._get_working_directory(chat_id) + session_id = context.chat_data.get("claude_session_id") # Check if /new was used โ€” skip auto-resume for this first message. # Flag is only cleared after a successful run so retries keep the intent. - force_new = bool(context.user_data.get("force_new_session")) + force_new = bool(context.chat_data.get("force_new_session")) verbose_level = self._get_verbose_level(context) tool_log: List[Dict[str, Any]] = [] @@ -1260,9 +1289,9 @@ async def agentic_document( ) if force_new: - context.user_data["force_new_session"] = False + context.chat_data["force_new_session"] = False - context.user_data["claude_session_id"] = claude_response.session_id + context.chat_data["claude_session_id"] = claude_response.session_id from .handlers.message import _update_working_directory_from_claude_response @@ -1429,11 +1458,10 @@ async def _handle_agentic_media_message( ) return - current_dir = context.user_data.get( - "current_directory", self.settings.approved_directory - ) - session_id = context.user_data.get("claude_session_id") - force_new = bool(context.user_data.get("force_new_session")) + chat_id = update.effective_chat.id + current_dir = self._get_working_directory(chat_id) + session_id = context.chat_data.get("claude_session_id") + force_new = bool(context.chat_data.get("force_new_session")) verbose_level = self._get_verbose_level(context) tool_log: List[Dict[str, Any]] = [] @@ -1461,9 +1489,9 @@ async def _handle_agentic_media_message( heartbeat.cancel() if force_new: - context.user_data["force_new_session"] = False + context.chat_data["force_new_session"] = False - context.user_data["claude_session_id"] = claude_response.session_id + context.chat_data["claude_session_id"] = claude_response.session_id from .handlers.message import _update_working_directory_from_claude_response diff --git a/src/config/settings.py b/src/config/settings.py index 77c34ea4..4ec5e8c4 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -261,6 +261,24 @@ class Settings(BaseSettings): notification_chat_ids: Optional[List[int]] = Field( None, description="Default Telegram chat IDs for proactive notifications" ) + + # Per-chat routing + personal_chat_id: Optional[int] = Field( + None, description="Telegram personal DM chat ID for routing" + ) + personal_chat_directory: Optional[Path] = Field( + None, description="Working directory for personal DM chat" + ) + group_chat_id: Optional[int] = Field( + None, description="Telegram group chat ID for routing" + ) + group_chat_directory: Optional[Path] = Field( + None, description="Working directory for group chat" + ) + group_trigger_prefix: str = Field( + "claude", description="Prefix required to trigger Claude in group chats" + ) + enable_project_threads: bool = Field( False, description="Enable strict routing by Telegram forum project threads", @@ -414,6 +432,39 @@ def validate_project_threads_chat_id(cls, v: Any) -> Optional[int]: return v return v # type: ignore[no-any-return] + @field_validator("personal_chat_id", "group_chat_id", mode="before") + @classmethod + def validate_optional_chat_id(cls, v: Any) -> Optional[int]: + """Allow empty chat ID by treating blank values as None.""" + if v is None: + return None + if isinstance(v, str): + value = v.strip() + if not value: + return None + return int(value) + if isinstance(v, int): + return v + return v # type: ignore[no-any-return] + + @field_validator("personal_chat_directory", "group_chat_directory", mode="before") + @classmethod + def validate_optional_directory(cls, v: Any) -> Optional[Path]: + """Validate optional routing directories โ€” allow None/empty.""" + if not v: + return None + if isinstance(v, str): + value = v.strip() + if not value: + return None + v = Path(value) + path = v.resolve() + if not path.exists(): + raise ValueError(f"Routing directory does not exist: {path}") + if not path.is_dir(): + raise ValueError(f"Routing directory is not a directory: {path}") + return path # type: ignore[no-any-return] + @field_validator("log_level") @classmethod def validate_log_level(cls, v: Any) -> str: diff --git a/tests/unit/test_bot/test_chat_routing.py b/tests/unit/test_bot/test_chat_routing.py new file mode 100644 index 00000000..6ec606bd --- /dev/null +++ b/tests/unit/test_bot/test_chat_routing.py @@ -0,0 +1,170 @@ +"""Unit tests for chat routing helpers.""" + +from src.bot.features.chat_routing import ( + HISTORY_CONTEXT_SIZE, + MAX_BUFFER_SIZE, + append_to_buffer, + build_group_prompt, + format_history, + get_working_directory, + is_group_triggered, + strip_group_trigger_prefix, +) +from src.config import create_test_config + + +def test_append_to_buffer_adds_message() -> None: + """Messages are appended in sender/text form.""" + buffer: list[dict[str, str]] = [] + + append_to_buffer(buffer, "Alice", "hello") + + assert buffer == [{"sender": "Alice", "text": "hello"}] + + +def test_append_to_buffer_trims_oldest_messages() -> None: + """The buffer keeps only the most recent MAX_BUFFER_SIZE messages.""" + buffer: list[dict[str, str]] = [] + + for idx in range(MAX_BUFFER_SIZE + 5): + append_to_buffer(buffer, f"User {idx}", f"msg {idx}") + + assert len(buffer) == MAX_BUFFER_SIZE + assert buffer[0] == {"sender": "User 5", "text": "msg 5"} + assert buffer[-1] == { + "sender": f"User {MAX_BUFFER_SIZE + 4}", + "text": f"msg {MAX_BUFFER_SIZE + 4}", + } + + +def test_format_history_handles_empty_messages() -> None: + """Formatting an empty history returns an empty string.""" + assert format_history([]) == "" + + +def test_format_history_formats_multiple_messages() -> None: + """History lines are rendered as Sender: text.""" + messages = [ + {"sender": "Alice", "text": "Hello"}, + {"sender": "Bob", "text": "World"}, + ] + + assert format_history(messages) == "Alice: Hello\nBob: World" + + +def test_get_working_directory_prefers_personal_chat(tmp_path) -> None: + """Personal-chat mapping wins when the chat ID matches.""" + personal_dir = tmp_path / "personal" + personal_dir.mkdir() + group_dir = tmp_path / "group" + group_dir.mkdir() + settings = create_test_config( + approved_directory=str(tmp_path), + personal_chat_id=123, + personal_chat_directory=str(personal_dir), + group_chat_id=-100, + group_chat_directory=str(group_dir), + ) + + assert get_working_directory(123, settings) == personal_dir.resolve() + + +def test_get_working_directory_uses_group_chat_directory(tmp_path) -> None: + """Group-chat mapping is used when the group chat matches.""" + group_dir = tmp_path / "group" + group_dir.mkdir() + settings = create_test_config( + approved_directory=str(tmp_path), + group_chat_id=-100, + group_chat_directory=str(group_dir), + ) + + assert get_working_directory(-100, settings) == group_dir.resolve() + + +def test_get_working_directory_falls_back_to_approved_directory(tmp_path) -> None: + """Unknown chats use the global approved directory.""" + settings = create_test_config(approved_directory=str(tmp_path)) + + assert get_working_directory(999, settings) == tmp_path.resolve() + + +def test_is_group_triggered_matches_plain_prefix() -> None: + """The plain prefix triggers with and without trailing text.""" + assert is_group_triggered("claude", "claude") is True + assert is_group_triggered("claude summarize this", "claude") is True + + +def test_is_group_triggered_matches_slash_prefix_variants() -> None: + """Slash commands trigger in both plain and @botname forms.""" + assert is_group_triggered("/claude", "claude") is True + assert is_group_triggered("/claude summarize this", "claude") is True + assert is_group_triggered("/claude@test_bot", "claude") is True + assert is_group_triggered("/claude@test_bot summarize this", "claude") is True + + +def test_is_group_triggered_rejects_non_matching_messages() -> None: + """Messages without the configured prefix do not trigger.""" + assert is_group_triggered("please ask claude", "claude") is False + assert is_group_triggered("/other@test_bot summarize this", "claude") is False + + +def test_strip_group_trigger_prefix_handles_plain_prefix() -> None: + """Plain-prefix messages are stripped down to their payload.""" + assert strip_group_trigger_prefix("claude", "claude") == "" + assert ( + strip_group_trigger_prefix("claude summarize this", "claude") + == "summarize this" + ) + + +def test_strip_group_trigger_prefix_handles_slash_prefix() -> None: + """Slash commands strip both the slash and any @botname suffix.""" + assert strip_group_trigger_prefix("/claude", "claude") == "" + assert ( + strip_group_trigger_prefix("/claude summarize this", "claude") + == "summarize this" + ) + assert strip_group_trigger_prefix("/claude@test_bot", "claude") == "" + assert ( + strip_group_trigger_prefix("/claude@test_bot summarize this", "claude") + == "summarize this" + ) + + +def test_build_group_prompt_returns_stripped_text_without_history() -> None: + """Triggered messages without history do not get a history wrapper.""" + assert ( + build_group_prompt([], "claude summarize this", "claude") == "summarize this" + ) + + +def test_build_group_prompt_injects_recent_history() -> None: + """History is prepended ahead of the stripped group prompt.""" + history = [ + {"sender": "Alice", "text": "First"}, + {"sender": "Bob", "text": "Second"}, + ] + + prompt = build_group_prompt(history, "claude summarize this", "claude") + + assert prompt == ( + "[Recent group conversation:\nAlice: First\nBob: Second\n]\n\n" + "summarize this" + ) + + +def test_build_group_prompt_limits_history_to_context_window() -> None: + """Only the last HISTORY_CONTEXT_SIZE entries are injected.""" + history = [ + {"sender": f"User {idx}", "text": f"msg {idx}"} + for idx in range(HISTORY_CONTEXT_SIZE + 3) + ] + + prompt = build_group_prompt(history, "claude summarize this", "claude") + + assert "User 0: msg 0" not in prompt + assert "User 1: msg 1" not in prompt + assert "User 2: msg 2" not in prompt + assert f"User 3: msg 3" in prompt + assert prompt.endswith("]\n\nsummarize this")