diff --git a/bot/vikingbot/agent/loop.py b/bot/vikingbot/agent/loop.py index 87d29d39..c34ab6e8 100644 --- a/bot/vikingbot/agent/loop.py +++ b/bot/vikingbot/agent/loop.py @@ -18,7 +18,7 @@ from vikingbot.bus.events import InboundMessage, OutboundEventType, OutboundMessage from vikingbot.bus.queue import MessageBus from vikingbot.config import load_config -from vikingbot.config.schema import Config, SessionKey +from vikingbot.config.schema import BotMode, Config, SessionKey from vikingbot.hooks import HookContext from vikingbot.hooks.manager import hook_manager from vikingbot.providers.base import LLMProvider @@ -200,6 +200,7 @@ async def run(self) -> None: OutboundMessage( session_key=msg.session_key, content=f"Sorry, I encountered an error: {str(e)}", + metadata=msg.metadata, ) ) except asyncio.TimeoutError: @@ -425,7 +426,6 @@ async def check_long_running(): preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info(f"Processing message from {msg.session_key}:{msg.sender_id}: {preview}") - # Get or create session session_key = msg.session_key # For CLI/direct sessions, skip heartbeat by default skip_heartbeat = session_key.type == "cli" @@ -439,6 +439,11 @@ async def check_long_running(): cmd = msg.content.strip().lower() if cmd == "/new": # Clone session for async consolidation, then immediately clear original + if not self._check_cmd_auth(msg): + return OutboundMessage( + session_key=msg.session_key, content="🐈 Sorry, you are not authorized to use this command.", + metadata=msg.metadata + ) session_clone = session.clone() session.clear() await self.sessions.save(session) @@ -447,13 +452,31 @@ async def check_long_running(): return OutboundMessage( session_key=msg.session_key, content="🐈 New session started. Memory consolidated.", metadata=msg.metadata ) + if cmd == "/remember": + if not self._check_cmd_auth(msg): + return OutboundMessage( + session_key=msg.session_key, content="🐈 Sorry, you are not authorized to use this command.", + metadata=msg.metadata + ) + session_clone = session.clone() + await self._consolidate_viking_memory(session_clone) + return OutboundMessage( + session_key=msg.session_key, content="This conversation has been submitted to memory storage.", metadata=msg.metadata + ) if cmd == "/help": return OutboundMessage( session_key=msg.session_key, - content="🐈 vikingbot commands:\n/new — Start a new conversation\n/help — Show available commands", + content="🐈 vikingbot commands:\n/new — Start a new conversation\n/remember — Submit current session to memories and start new session\n/help — Show available commands", metadata=msg.metadata ) + # Debug mode handling + if self.config.mode == BotMode.DEBUG: + # In debug mode, only record message to session, no processing or reply + session.add_message("user", msg.content, sender_id=msg.sender_id) + await self.sessions.save(session) + return None + # Consolidate memory before processing if session is too large if len(session.messages) > self.memory_window: # Clone session for async consolidation, then immediately trim original @@ -567,15 +590,18 @@ async def _consolidate_memory(self, session, archive_all: bool = False) -> None: return # use openviking tools to extract memory - await hook_manager.execute_hooks( - context=HookContext( - event_type="message.compact", - session_id=session.key.safe_name(), - workspace_id=self.sandbox_manager.to_workspace_id(session.key), - session_key=session.key, - ), - session=session, - ) + config = self.config + if config.mode == BotMode.READONLY: + if not config.channels_config or not config.channels_config.get_all_channels(): + return + allow_from = [config.ov_server.admin_user_id] + for channel_config in config.channels_config.get_all_channels(): + if channel_config and channel_config.type.value == session.key.type: + if hasattr(channel_config, "allow_from"): + allow_from.extend(channel_config.allow_from) + messages = [msg for msg in session.messages if msg.get("sender_id") in allow_from] + session.messages = messages + await self._consolidate_viking_memory(session) if self.sandbox_manager: memory_workspace = self.sandbox_manager.get_workspace_path(session.key) @@ -656,6 +682,26 @@ async def _consolidate_memory(self, session, archive_all: bool = False) -> None: except Exception as e: logger.exception(f"Memory consolidation failed: {e}") + async def _consolidate_viking_memory(self, session) -> None: + """Consolidate old messages into MEMORY.md + HISTORY.md. Works on a cloned session.""" + try: + if not session.messages: + logger.info(f"No messages to commit openviking for session {session.key.safe_name()} (allow_from filter applied)") + return + + # use openviking tools to extract memory + await hook_manager.execute_hooks( + context=HookContext( + event_type="message.compact", + session_id=session.key.safe_name(), + workspace_id=self.sandbox_manager.to_workspace_id(session.key), + session_key=session.key, + ), + session=session, + ) + except Exception as e: + logger.exception(f"Memory consolidation failed: {e}") + async def _safe_consolidate_memory(self, session, archive_all: bool = False) -> None: """Safe wrapper for _consolidate_memory that ensures all exceptions are caught.""" try: @@ -663,6 +709,31 @@ async def _safe_consolidate_memory(self, session, archive_all: bool = False) -> except Exception as e: logger.exception(f"Background memory consolidation task failed: {e}") + def _check_cmd_auth(self, msg: InboundMessage) -> bool: + """Check if the session key is authorized for command execution. + + Returns: + True if authorized, False otherwise. + Args: + session_key: Session key to check. + """ + if self.config.mode == BotMode.NORMAL: + return True + allow_from = [] + if self.config.ov_server and self.config.ov_server.admin_user_id: + allow_from.append(self.config.ov_server.admin_user_id) + for channel in self.config.channels_config.get_all_channels(): + if channel.channel_key() == msg.session_key.channel_key(): + if channel.allow_from: + allow_from.extend(channel.allow_from) + break + + # If channel not found or sender not in allow_from list, ignore message + if msg.sender_id not in allow_from: + logger.debug(f"Sender {msg.sender_id} not allowed in channel {msg.session_key.channel_key()}") + return False + return True + async def process_direct( self, content: str, diff --git a/bot/vikingbot/agent/memory.py b/bot/vikingbot/agent/memory.py index 8e47adcd..3a486464 100644 --- a/bot/vikingbot/agent/memory.py +++ b/bot/vikingbot/agent/memory.py @@ -51,21 +51,17 @@ async def get_viking_memory_context(self, current_message: str, workspace_id: st try: client = await VikingClient.create(agent_id=workspace_id) admin_user_id = load_config().ov_server.admin_user_id - start = time.time() result = await client.search_memory(current_message, user_id=admin_user_id, limit=3) - cost = round(time.time() - start, 2) if not result: - logger.info(f"[USER_MEMORY]: search failed. cost {cost}") return "" user_memory = self._parse_viking_memory(result["user_memory"]) agent_memory = self._parse_viking_memory(result["agent_memory"]) - logger.info(f"[USER_MEMORY]: search success. res: {user_memory[:100]}. cost {cost}") return ( f"### user memories:\n{user_memory}\n" f"### agent memories:\n{agent_memory}" ) except Exception as e: - logger.error(f"[USER_MEMORY]: search failed. {e}") + logger.error(f"[READ_USER_MEMORY]: search error. {e}") return "" async def get_viking_user_profile(self, workspace_id: str, user_id: str) -> str: diff --git a/bot/vikingbot/channels/feishu.py b/bot/vikingbot/channels/feishu.py index 1a92fcec..03e39a25 100644 --- a/bot/vikingbot/channels/feishu.py +++ b/bot/vikingbot/channels/feishu.py @@ -12,6 +12,7 @@ import httpx from loguru import logger +from vikingbot.config import load_config from vikingbot.utils import get_data_path # Optional HTML processing libraries @@ -30,7 +31,7 @@ from vikingbot.bus.events import OutboundMessage from vikingbot.bus.queue import MessageBus from vikingbot.channels.base import BaseChannel -from vikingbot.config.schema import FeishuChannelConfig +from vikingbot.config.schema import FeishuChannelConfig, BotMode try: import lark_oapi as lark @@ -747,25 +748,19 @@ async def _on_message(self, data: "P2ImMessageReceiveV1") -> None: # 检查是否@了机器人 is_mentioned = False mention_pattern = re.compile(r"@_user_\d+") - bot_open_id = self.config.open_id - bot_app_id = self.config.app_id + bot_name = self.config.bot_name # 优先从message的mentions字段提取@信息(text和post类型都适用) - if hasattr(message, 'mentions') and message.mentions and bot_open_id: + if hasattr(message, 'mentions') and message.mentions and bot_name: for mention in message.mentions: - if hasattr(mention, 'id') and hasattr(mention.id, 'open_id'): - at_id = mention.id.open_id - if at_id == bot_open_id: + if hasattr(mention, 'name'): + at_name = mention.name + if at_name == self.config.bot_name: is_mentioned = True break continue - # 兼容其他可能的ID格式 - at_id = getattr(mention, 'id', '') or getattr(mention, 'user_id', '') - if at_id == f"app_{bot_app_id}" or at_id == bot_app_id: - is_mentioned = True - break - # 话题群@检查逻辑 + config = load_config() should_process = True if chat_type == "group": chat_mode = await self._get_chat_mode(chat_id) @@ -780,7 +775,7 @@ async def _on_message(self, data: "P2ImMessageReceiveV1") -> None: should_process = False else: # 模式2:False,仅话题首条消息不需要@,后续回复需要@ - if not is_topic_starter and not is_mentioned: + if not is_topic_starter and not is_mentioned and config.mode != BotMode.DEBUG: logger.info(f"Skipping thread message: not topic starter and not mentioned") should_process = False @@ -789,7 +784,8 @@ async def _on_message(self, data: "P2ImMessageReceiveV1") -> None: return # 确认需要处理后再添加"已读"表情 - await self._add_reaction(message_id, "MeMeMe") + if config and config.mode != BotMode.DEBUG: + await self._add_reaction(message_id, "MeMeMe") # 替换所有@占位符 content = mention_pattern.sub(f"@{sender_id}", content) diff --git a/bot/vikingbot/config/schema.py b/bot/vikingbot/config/schema.py index 90a2b10c..9443be61 100644 --- a/bot/vikingbot/config/schema.py +++ b/bot/vikingbot/config/schema.py @@ -47,6 +47,13 @@ class AgentMemoryMode(str, Enum): PER_CHANNEL = "per-channel" +class BotMode(str, Enum): + """Bot running mode enumeration.""" + NORMAL = "normal" + READONLY = "readonly" + DEBUG = "debug" + + class BaseChannelConfig(BaseModel): """Base channel configuration.""" @@ -104,7 +111,7 @@ class FeishuChannelConfig(BaseChannelConfig): type: ChannelType = ChannelType.FEISHU app_id: str = "" - open_id: str = "" + bot_name: str = "" app_secret: str = "" encrypt_key: str = "" verification_token: str = "" @@ -615,7 +622,12 @@ class Config(BaseSettings): ) storage_workspace: Optional[str] = None # From ov.conf root level storage.workspace use_local_memory: bool = False - read_only: bool = False + mode: BotMode = BotMode.NORMAL + + @property + def read_only(self) -> bool: + """Backward compatibility for read_only property.""" + return self.mode == BotMode.READONLY @property def channels_config(self) -> ChannelsConfig: diff --git a/bot/vikingbot/hooks/builtins/openviking_hooks.py b/bot/vikingbot/hooks/builtins/openviking_hooks.py index 50c1ecb6..2cbd51e3 100644 --- a/bot/vikingbot/hooks/builtins/openviking_hooks.py +++ b/bot/vikingbot/hooks/builtins/openviking_hooks.py @@ -38,42 +38,14 @@ async def _get_client(self, workspace_id: str) -> VikingClient: # Use global singleton client return await get_global_client() - def _filter_messages_by_sender(self, messages: list[dict], allow_from: list[str]) -> list[dict]: - """筛选出 sender_id 在 allow_from 列表中的消息""" - if not allow_from: - return [] - return [msg for msg in messages if msg.get("sender_id") in allow_from] - - def _get_channel_allow_from(self, session_key: SessionKey): - """根据 session_id 获取对应频道的 allow_from 配置""" - config = load_config() - if not config.read_only: - return True, [] - allow_from = [config.ov_server.admin_user_id] - if not session_key or not config.channels: - return False, allow_from - # 查找对应类型的 channel config - for channel_config in config.channels_config.get_all_channels(): - if channel_config and channel_config.type.value == session_key.type: - if hasattr(channel_config, "allow_from"): - allow_from.extend(channel_config.allow_from) - return False, allow_from async def execute(self, context: HookContext, **kwargs) -> Any: vikingbot_session: Session = kwargs.get("session", {}) session_id = context.session_key.safe_name() try: - is_shared, allow_from = self._get_channel_allow_from(context.session_key) - filtered_messages = vikingbot_session.messages - if not is_shared: - filtered_messages = self._filter_messages_by_sender(vikingbot_session.messages, allow_from) - if not filtered_messages: - logger.info(f"No messages to commit openviking for session {session_id} (allow_from filter applied)") - return {"success": True, "message": "No messages matched allow_from filter"} - client = await self._get_client(context.workspace_id) - result = await client.commit(session_id, filtered_messages, load_config().ov_server.admin_user_id) + result = await client.commit(session_id, vikingbot_session.messages, load_config().ov_server.admin_user_id) return result except Exception as e: logger.exception(f"Failed to add message to OpenViking: {e}") diff --git a/bot/vikingbot/openviking_mount/ov_server.py b/bot/vikingbot/openviking_mount/ov_server.py index d3f6383c..583cd5bb 100644 --- a/bot/vikingbot/openviking_mount/ov_server.py +++ b/bot/vikingbot/openviking_mount/ov_server.py @@ -440,11 +440,11 @@ async def main_test(): # res = client.list_resources() # res = await client.search("头有点疼", target_uri="viking://user/memories/") # res = await client.get_viking_memory_context("123", current_message="头疼", history=[]) - # res = await client.search_memory("你好", "user_1") + res = await client.search_memory("你好", "user_1") # res = await client.list_resources("viking://resources/") # res = await client.read_content("viking://user/memories/profile.md", level="read") # res = await client.add_resource("https://github.com/volcengine/OpenViking", "ov代码") - res = await client.grep("viking://resources/", "viking", True) + # res = await client.grep("viking://resources/", "viking", True) # res = await client.commit( # session_id="99999", # messages=[{"role": "user", "content": "你好"}], @@ -466,7 +466,8 @@ async def account_test(): # res = await client.admin_remove_user("default", "") # res = await client.admin_remove_user("default", "admin") # res = await client.admin_list_accounts() - res = await client.admin_create_account("eval", "default") + # res = await client.admin_create_account("eval", "default") + res = await client.admin_register_user("default", "test_root", "root") print(res)