Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,32 @@ QUICK_ACTIONS_TIMEOUT=120
# Git operations timeout in seconds
GIT_OPERATIONS_TIMEOUT=30

# === PER-CHAT ROUTING ===
# Route specific Telegram chats to dedicated Claude sessions and working directories.
# This allows, for example, a personal DM to work in your home directory while a
# shared group chat works in a project-specific folder — each with its own session.

# Telegram chat ID of your personal DM with the bot
# Find your ID by messaging @userinfobot
PERSONAL_CHAT_ID=

# Working directory for the personal DM session
PERSONAL_CHAT_DIRECTORY=

# Telegram chat ID of a group chat (negative number, e.g. -1001234567890)
GROUP_CHAT_ID=

# Working directory for the group chat session
GROUP_CHAT_DIRECTORY=

# Prefix word required to trigger Claude in group chats.
# Messages NOT starting with this prefix are silently buffered for context
# but do not produce a response. Supports both "claude <msg>" and "/claude <msg>".
# Buffered history is stored in Telegram ``chat_data`` and only survives bot
# restarts when a python-telegram-bot Persistence backend is configured.
# Default: claude
GROUP_TRIGGER_PREFIX=claude

# === PROJECT THREAD MODE ===
# Enable strict routing by Telegram project topics
ENABLE_PROJECT_THREADS=false
Expand Down
111 changes: 111 additions & 0 deletions src/bot/features/chat_routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Per-chat routing: working directories and group conversation helpers.

This module enables two related features:

**Per-chat working directories**
Map specific Telegram chat IDs to dedicated Claude sessions and working
directories via ``PERSONAL_CHAT_ID`` / ``PERSONAL_CHAT_DIRECTORY`` and
``GROUP_CHAT_ID`` / ``GROUP_CHAT_DIRECTORY`` in your ``.env``. Messages from
an unrecognised chat fall back to ``APPROVED_DIRECTORY``.

**Group trigger prefix + conversation history**
In group chats Claude is silent by default. Only messages that begin with
the configured prefix (default: ``claude``) — or the equivalent slash command
(``/claude``) — trigger a response. All other messages are stored in a
per-chat rolling buffer so that when Claude *is* triggered it receives the
recent conversation as context, allowing it to answer questions about what
was discussed without every participant having to @-mention the bot.
"""

from pathlib import Path

from src.config.settings import Settings

# Maximum number of messages kept in the per-chat history buffer.
MAX_BUFFER_SIZE = 30

# How many buffered messages are prepended as context when Claude is triggered.
HISTORY_CONTEXT_SIZE = 20


def append_to_buffer(
buffer: list[dict[str, str]], sender_name: str, text: str
) -> None:
"""Append a message and trim the buffer to :data:`MAX_BUFFER_SIZE`."""
buffer.append({"sender": sender_name, "text": text})
if len(buffer) > MAX_BUFFER_SIZE:
del buffer[: len(buffer) - MAX_BUFFER_SIZE]


def format_history(messages: list[dict[str, str]]) -> str:
"""Return messages formatted as ``Sender: text`` lines."""
return "\n".join(f"{msg['sender']}: {msg['text']}" for msg in messages)


def is_group_triggered(message_text: str, trigger_prefix: str) -> bool:
"""Return whether a group message should trigger Claude."""
lower_text = message_text.lower()
lower_prefix = trigger_prefix.lower()
slash_prefix = f"/{lower_prefix}"
slash_variants = (f"{slash_prefix} ", f"{slash_prefix}@")
return (
lower_text == lower_prefix
or lower_text.startswith(f"{lower_prefix} ")
or lower_text == slash_prefix
or lower_text.startswith(slash_variants)
)


def strip_group_trigger_prefix(message_text: str, trigger_prefix: str) -> str:
"""Remove the plain/slash trigger prefix, including ``@botname`` variants."""
lower_text = message_text.lower()
lower_prefix = trigger_prefix.lower()
slash_prefix = f"/{lower_prefix}"
if lower_text == lower_prefix:
return ""
if lower_text.startswith(f"{lower_prefix} "):
return message_text[len(trigger_prefix) :].lstrip()
if lower_text == slash_prefix:
return ""
if lower_text.startswith(f"{slash_prefix} "):
return message_text[len(slash_prefix) :].lstrip()
if lower_text.startswith(f"{slash_prefix}@"):
parts = message_text.split(maxsplit=1)
return parts[1] if len(parts) > 1 else ""
return message_text


def build_group_prompt(
history: list[dict[str, str]], message_text: str, trigger_prefix: str
) -> str:
"""Build the Claude prompt for a triggered group message."""
stripped = strip_group_trigger_prefix(message_text, trigger_prefix)
context_messages = history[-HISTORY_CONTEXT_SIZE:]
if not context_messages:
return stripped

history_str = format_history(context_messages)
return f"[Recent group conversation:\n{history_str}\n]\n\n{stripped}"


def get_working_directory(chat_id: int, settings: Settings) -> Path:
"""Return the working directory to use for *chat_id*.

Priority:
1. ``personal_chat_directory`` if ``chat_id == personal_chat_id``
2. ``group_chat_directory`` if ``chat_id == group_chat_id``
3. ``approved_directory`` (the global fallback)
"""
if (
settings.personal_chat_id is not None
and chat_id == settings.personal_chat_id
and settings.personal_chat_directory is not None
):
return settings.personal_chat_directory
if (
settings.group_chat_id is not None
and chat_id == settings.group_chat_id
and settings.group_chat_directory is not None
):
return settings.group_chat_directory
return settings.approved_directory
90 changes: 59 additions & 31 deletions src/bot/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional, cast

import structlog
from telegram import (
BotCommand,
Chat,
InlineKeyboardButton,
InlineKeyboardMarkup,
InputMediaPhoto,
Expand All @@ -32,6 +33,12 @@
from ..claude.sdk_integration import StreamUpdate
from ..config.settings import Settings
from ..projects import PrivateTopicsUnavailableError
from .features.chat_routing import (
append_to_buffer,
build_group_prompt,
get_working_directory,
is_group_triggered,
)
from .utils.draft_streamer import DraftStreamer, generate_draft_id
from .utils.html_format import escape_html
from .utils.image_extractor import (
Expand Down Expand Up @@ -267,6 +274,10 @@ def _is_within(path: Path, root: Path) -> bool:
except ValueError:
return False

def _get_working_directory(self, chat_id: int) -> Path:
"""Return working directory based on per-chat routing config."""
return get_working_directory(chat_id, self.settings)

@staticmethod
def _extract_message_thread_id(update: Update) -> Optional[int]:
"""Extract topic/thread id from update message for forum/direct topics."""
Expand Down Expand Up @@ -520,8 +531,11 @@ async def agentic_start(
return
except Exception:
sync_line = "\n\n🧵 Topic sync failed. Run /sync_threads to retry."
current_dir = context.user_data.get(
"current_directory", self.settings.approved_directory
chat_id = update.effective_chat.id if update.effective_chat else None
current_dir = (
self._get_working_directory(chat_id)
if chat_id
else self.settings.approved_directory
)
dir_display = f"<code>{current_dir}/</code>"

Expand All @@ -539,22 +553,21 @@ async def agentic_new(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Reset session, one-line confirmation."""
context.user_data["claude_session_id"] = None
context.user_data["session_started"] = True
context.user_data["force_new_session"] = True
context.chat_data["claude_session_id"] = None
context.chat_data["session_started"] = True
context.chat_data["force_new_session"] = True

await update.message.reply_text("Session reset. What's next?")

async def agentic_status(
self, update: Update, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Compact one-line status, no buttons."""
current_dir = context.user_data.get(
"current_directory", self.settings.approved_directory
)
chat_id = update.effective_chat.id
current_dir = self._get_working_directory(chat_id)
dir_display = str(current_dir)

session_id = context.user_data.get("claude_session_id")
session_id = context.chat_data.get("claude_session_id")
session_status = "active" if session_id else "none"

# Cost info
Expand Down Expand Up @@ -912,6 +925,24 @@ async def agentic_text(
user_id = update.effective_user.id
message_text = update.message.text

# Group chat: store message in history buffer; only respond if prefixed
chat_type = update.effective_chat.type if update.effective_chat else None
if chat_type in (Chat.GROUP, Chat.SUPERGROUP):
buffer = cast(
list[dict[str, str]], context.chat_data.setdefault("_msg_buffer", [])
)
sender_name = (
update.effective_user.first_name if update.effective_user else None
)
append_to_buffer(buffer, sender_name or "Unknown", message_text)
prefix = self.settings.group_trigger_prefix
if not is_group_triggered(message_text, prefix):
# Not triggered — store only, no response
return

history = buffer[:-1] # All messages except the one just added
message_text = build_group_prompt(history, message_text, prefix)

logger.info(
"Agentic text message",
user_id=user_id,
Expand Down Expand Up @@ -957,14 +988,13 @@ async def agentic_text(
)
return

current_dir = context.user_data.get(
"current_directory", self.settings.approved_directory
)
session_id = context.user_data.get("claude_session_id")
chat_id = update.effective_chat.id
current_dir = self._get_working_directory(chat_id)
session_id = context.chat_data.get("claude_session_id")

# Check if /new was used — skip auto-resume for this first message.
# Flag is only cleared after a successful run so retries keep the intent.
force_new = bool(context.user_data.get("force_new_session"))
force_new = bool(context.chat_data.get("force_new_session"))

# --- Verbose progress tracking via stream callback ---
tool_log: List[Dict[str, Any]] = []
Expand Down Expand Up @@ -1011,9 +1041,9 @@ async def agentic_text(

# New session created successfully — clear the one-shot flag
if force_new:
context.user_data["force_new_session"] = False
context.chat_data["force_new_session"] = False

context.user_data["claude_session_id"] = claude_response.session_id
context.chat_data["claude_session_id"] = claude_response.session_id

# Track directory changes
from .handlers.message import _update_working_directory_from_claude_response
Expand Down Expand Up @@ -1227,14 +1257,13 @@ async def agentic_document(
)
return

current_dir = context.user_data.get(
"current_directory", self.settings.approved_directory
)
session_id = context.user_data.get("claude_session_id")
chat_id = update.effective_chat.id
current_dir = self._get_working_directory(chat_id)
session_id = context.chat_data.get("claude_session_id")

# Check if /new was used — skip auto-resume for this first message.
# Flag is only cleared after a successful run so retries keep the intent.
force_new = bool(context.user_data.get("force_new_session"))
force_new = bool(context.chat_data.get("force_new_session"))

verbose_level = self._get_verbose_level(context)
tool_log: List[Dict[str, Any]] = []
Expand All @@ -1260,9 +1289,9 @@ async def agentic_document(
)

if force_new:
context.user_data["force_new_session"] = False
context.chat_data["force_new_session"] = False

context.user_data["claude_session_id"] = claude_response.session_id
context.chat_data["claude_session_id"] = claude_response.session_id

from .handlers.message import _update_working_directory_from_claude_response

Expand Down Expand Up @@ -1429,11 +1458,10 @@ async def _handle_agentic_media_message(
)
return

current_dir = context.user_data.get(
"current_directory", self.settings.approved_directory
)
session_id = context.user_data.get("claude_session_id")
force_new = bool(context.user_data.get("force_new_session"))
chat_id = update.effective_chat.id
current_dir = self._get_working_directory(chat_id)
session_id = context.chat_data.get("claude_session_id")
force_new = bool(context.chat_data.get("force_new_session"))

verbose_level = self._get_verbose_level(context)
tool_log: List[Dict[str, Any]] = []
Expand Down Expand Up @@ -1461,9 +1489,9 @@ async def _handle_agentic_media_message(
heartbeat.cancel()

if force_new:
context.user_data["force_new_session"] = False
context.chat_data["force_new_session"] = False

context.user_data["claude_session_id"] = claude_response.session_id
context.chat_data["claude_session_id"] = claude_response.session_id

from .handlers.message import _update_working_directory_from_claude_response

Expand Down
Loading