diff --git a/examples/api/product_api.py b/examples/api/product_api.py index b98f3b8e5..e364ce483 100644 --- a/examples/api/product_api.py +++ b/examples/api/product_api.py @@ -119,6 +119,23 @@ def chat_stream(query: str, session_id: str, history: list | None = None): print(payload) +def feedback_memory(feedback_content: str, history: list | None = None): + url = f"{BASE_URL}/feedback" + data = { + "user_id": USER_ID, + "writable_cube_ids": [MEM_CUBE_ID], + "history": history, + "feedback_content": feedback_content, + "async_mode": "sync", + "corrected_answer": "false", + } + + print("[*] Feedbacking memory ...") + resp = requests.post(url, headers=HEADERS, data=json.dumps(data), timeout=30) + print(resp.status_code, resp.text) + return resp.json() + + if __name__ == "__main__": print("===== STEP 1: Register User =====") register_user() @@ -140,5 +157,14 @@ def chat_stream(query: str, session_id: str, history: list | None = None): ], ) - print("\n===== STEP 4: Stream Chat =====") + print("\n===== STEP 5: Stream Chat =====") chat_stream("我刚和你说什么了呢", SESSION_ID2, history=[]) + + print("\n===== STEP 6: Feedback Memory =====") + feedback_memory( + feedback_content="错啦,我今天没有吃拉面", + history=[ + {"role": "user", "content": "我刚和你说什么了呢"}, + {"role": "assistant", "content": "你今天吃了好吃的拉面"}, + ], + ) diff --git a/src/memos/api/handlers/add_handler.py b/src/memos/api/handlers/add_handler.py index 1bd83eae7..46e7fd108 100644 --- a/src/memos/api/handlers/add_handler.py +++ b/src/memos/api/handlers/add_handler.py @@ -6,7 +6,7 @@ """ from memos.api.handlers.base_handler import BaseHandler, HandlerDependencies -from memos.api.product_models import APIADDRequest, MemoryResponse +from memos.api.product_models import APIADDRequest, APIFeedbackRequest, MemoryResponse from memos.memories.textual.item import ( list_all_fields, ) @@ -30,7 +30,9 @@ def __init__(self, dependencies: HandlerDependencies): dependencies: HandlerDependencies instance """ super().__init__(dependencies) - self._validate_dependencies("naive_mem_cube", "mem_reader", "mem_scheduler") + self._validate_dependencies( + "naive_mem_cube", "mem_reader", "mem_scheduler", "feedback_server" + ) def handle_add_memories(self, add_req: APIADDRequest) -> MemoryResponse: """ @@ -56,6 +58,39 @@ def handle_add_memories(self, add_req: APIADDRequest) -> MemoryResponse: cube_view = self._build_cube_view(add_req) + if add_req.is_feedback: + chat_history = add_req.chat_history + messages = add_req.messages + if chat_history is None: + chat_history = [] + if messages is None: + messages = [] + concatenate_chat = chat_history + messages + + last_user_index = max(i for i, d in enumerate(concatenate_chat) if d["role"] == "user") + feedback_content = concatenate_chat[last_user_index]["content"] + feedback_history = concatenate_chat[:last_user_index] + + feedback_req = APIFeedbackRequest( + user_id=add_req.user_id, + session_id=add_req.session_id, + task_id=add_req.task_id, + history=feedback_history, + feedback_content=feedback_content, + writable_cube_ids=add_req.writable_cube_ids, + async_mode=add_req.async_mode, + ) + process_record = cube_view.feedback_memories(feedback_req) + + self.logger.info( + f"[FeedbackHandler] Final feedback results count={len(process_record)}" + ) + + return MemoryResponse( + message="Memory feedback successfully", + data=[process_record], + ) + results = cube_view.add_memories(add_req) self.logger.info(f"[AddHandler] Final add results count={len(results)}") @@ -88,6 +123,7 @@ def _build_cube_view(self, add_req: APIADDRequest) -> MemCubeView: mem_reader=self.mem_reader, mem_scheduler=self.mem_scheduler, logger=self.logger, + feedback_server=self.feedback_server, searcher=None, ) else: @@ -98,6 +134,7 @@ def _build_cube_view(self, add_req: APIADDRequest) -> MemCubeView: mem_reader=self.mem_reader, mem_scheduler=self.mem_scheduler, logger=self.logger, + feedback_server=self.feedback_server, searcher=None, ) for cube_id in cube_ids diff --git a/src/memos/api/handlers/base_handler.py b/src/memos/api/handlers/base_handler.py index 9df3310ec..3c0314235 100644 --- a/src/memos/api/handlers/base_handler.py +++ b/src/memos/api/handlers/base_handler.py @@ -37,6 +37,7 @@ def __init__( internet_retriever: Any | None = None, memory_manager: Any | None = None, mos_server: Any | None = None, + feedback_server: Any | None = None, **kwargs, ): """ @@ -68,6 +69,7 @@ def __init__( self.internet_retriever = internet_retriever self.memory_manager = memory_manager self.mos_server = mos_server + self.feedback_server = feedback_server # Store any additional dependencies for key, value in kwargs.items(): @@ -166,6 +168,11 @@ def deepsearch_agent(self): """Get deepsearch agent instance.""" return self.deps.deepsearch_agent + @property + def feedback_server(self): + """Get feedback server instance.""" + return self.deps.feedback_server + def _validate_dependencies(self, *required_deps: str) -> None: """ Validate that required dependencies are available. diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index 574f2ae17..632c2ed4c 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -29,6 +29,7 @@ from memos.llms.factory import LLMFactory from memos.log import get_logger from memos.mem_cube.navie import NaiveMemCube +from memos.mem_feedback.simple_feedback import SimpleMemFeedback from memos.mem_os.product_server import MOSServer from memos.mem_reader.factory import MemReaderFactory from memos.mem_scheduler.orm_modules.base_model import BaseDBManager @@ -295,6 +296,16 @@ def init_server() -> dict[str, Any]: ) logger.debug("Searcher created") + # Initialize feedback server + feedback_server = SimpleMemFeedback( + llm=llm, + embedder=embedder, + graph_store=graph_db, + memory_manager=memory_manager, + mem_reader=mem_reader, + searcher=searcher, + ) + # Initialize Scheduler scheduler_config_dict = APIConfig.get_scheduler_config() scheduler_config = SchedulerConfigFactory( @@ -308,7 +319,9 @@ def init_server() -> dict[str, Any]: mem_reader=mem_reader, redis_client=redis_client, ) - mem_scheduler.init_mem_cube(mem_cube=naive_mem_cube, searcher=searcher) + mem_scheduler.init_mem_cube( + mem_cube=naive_mem_cube, searcher=searcher, feedback_server=feedback_server + ) logger.debug("Scheduler initialized") # Initialize SchedulerAPIModule @@ -356,6 +369,7 @@ def init_server() -> dict[str, Any]: "text_mem": text_mem, "pref_mem": pref_mem, "online_bot": online_bot, + "feedback_server": feedback_server, "redis_client": redis_client, "deepsearch_agent": deepsearch_agent, } diff --git a/src/memos/api/handlers/feedback_handler.py b/src/memos/api/handlers/feedback_handler.py new file mode 100644 index 000000000..cf5c536ea --- /dev/null +++ b/src/memos/api/handlers/feedback_handler.py @@ -0,0 +1,93 @@ +""" +Feeback handler for memory add/update functionality. +""" + +from memos.api.handlers.base_handler import BaseHandler, HandlerDependencies +from memos.api.product_models import APIFeedbackRequest, MemoryResponse +from memos.log import get_logger +from memos.multi_mem_cube.composite_cube import CompositeCubeView +from memos.multi_mem_cube.single_cube import SingleCubeView +from memos.multi_mem_cube.views import MemCubeView + + +logger = get_logger(__name__) + + +class FeedbackHandler(BaseHandler): + """ + Handler for memory feedback operations. + + Provides fast, fine-grained, and mixture-based feedback modes. + """ + + def __init__(self, dependencies: HandlerDependencies): + """ + Initialize feedback handler. + + Args: + dependencies: HandlerDependencies instance + """ + super().__init__(dependencies) + self._validate_dependencies("mem_reader", "mem_scheduler", "searcher") + + def handle_feedback_memories(self, feedback_req: APIFeedbackRequest) -> MemoryResponse: + """ + Main handler for feedback memories endpoint. + + Args: + feedback_req: feedback request containing content and parameters + + Returns: + MemoryResponse with formatted results + """ + cube_view = self._build_cube_view(feedback_req) + + process_record = cube_view.feedback_memories(feedback_req) + + self.logger.info(f"[FeedbackHandler] Final feedback results count={len(process_record)}") + + return MemoryResponse( + message="Memory feedback successfully", + data=[process_record], + ) + + def _resolve_cube_ids(self, feedback_req: APIFeedbackRequest) -> list[str]: + """ + Normalize target cube ids from feedback_req. + """ + if feedback_req.writable_cube_ids: + return list(dict.fromkeys(feedback_req.writable_cube_ids)) + + return [feedback_req.user_id] + + def _build_cube_view(self, feedback_req: APIFeedbackRequest) -> MemCubeView: + cube_ids = self._resolve_cube_ids(feedback_req) + + if len(cube_ids) == 1: + cube_id = cube_ids[0] + return SingleCubeView( + cube_id=cube_id, + naive_mem_cube=None, + mem_reader=None, + mem_scheduler=self.mem_scheduler, + logger=self.logger, + searcher=None, + feedback_server=self.feedback_server, + ) + else: + single_views = [ + SingleCubeView( + cube_id=cube_id, + naive_mem_cube=None, + mem_reader=None, + mem_scheduler=self.mem_scheduler, + logger=self.logger, + searcher=None, + feedback_server=self.feedback_server, + ) + for cube_id in cube_ids + ] + return CompositeCubeView( + cube_views=single_views, + logger=self.logger, + ) diff --git a/src/memos/api/product_models.py b/src/memos/api/product_models.py index 164cf10da..d2e7c5946 100644 --- a/src/memos/api/product_models.py +++ b/src/memos/api/product_models.py @@ -6,7 +6,7 @@ # Import message types from core types module from memos.log import get_logger -from memos.types import PermissionDict, SearchMode +from memos.types import MessageDict, MessageList, MessagesType, PermissionDict, SearchMode logger = get_logger(__name__) @@ -628,6 +628,38 @@ def _convert_deprecated_fields(self) -> "APIADDRequest": return self +class APIFeedbackRequest(BaseRequest): + """Request model for processing feedback info.""" + + user_id: str = Field(..., description="User ID") + session_id: str | None = Field( + "default_session", description="Session ID for soft-filtering memories" + ) + task_id: str | None = Field(None, description="Task ID for monitering async tasks") + history: list[MessageDict] | None = Field(..., description="Chat history") + retrieved_memory_ids: list[str] | None = Field( + None, description="Retrieved memory ids at last turn" + ) + feedback_content: str | None = Field(..., description="Feedback content to process") + feedback_time: str | None = Field(None, description="Feedback time") + # ==== Multi-cube writing ==== + writable_cube_ids: list[str] | None = Field( + None, description="List of cube IDs user can write for multi-cube add" + ) + async_mode: Literal["sync", "async"] = Field( + "async", description="feedback mode: sync or async" + ) + corrected_answer: bool = Field(False, description="Whether need return corrected answer") + # ==== Backward compatibility ==== + mem_cube_id: str | None = Field( + None, + description=( + "(Deprecated) Single cube ID to search in. " + "Prefer `readable_cube_ids` for multi-cube search." + ), + ) + + class APIChatCompleteRequest(BaseRequest): """Request model for chat operations.""" diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index b40547fa4..5b2107b6c 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -21,10 +21,12 @@ from memos.api.handlers.add_handler import AddHandler from memos.api.handlers.base_handler import HandlerDependencies from memos.api.handlers.chat_handler import ChatHandler +from memos.api.handlers.feedback_handler import FeedbackHandler from memos.api.handlers.search_handler import SearchHandler from memos.api.product_models import ( APIADDRequest, APIChatCompleteRequest, + APIFeedbackRequest, APISearchRequest, ChatRequest, DeleteMemoryRequest, @@ -66,7 +68,7 @@ add_handler, online_bot=components.get("online_bot"), ) - +feedback_handler = FeedbackHandler(dependencies) # Extract commonly used components for function-based handlers # (These can be accessed from the components dict without unpacking all of them) mem_scheduler: BaseScheduler = components["mem_scheduler"] @@ -265,3 +267,18 @@ def delete_memories(memory_req: DeleteMemoryRequest): return handlers.memory_handler.handle_delete_memories( delete_mem_req=memory_req, naive_mem_cube=naive_mem_cube ) + + +# ============================================================================= +# Feedback API Endpoints +# ============================================================================= + + +@router.post("/feedback", summary="Feedback memories", response_model=MemoryResponse) +def feedback_memories(feedback_req: APIFeedbackRequest): + """ + Feedback memories for a specific user. + + This endpoint uses the class-based FeedbackHandler for better code organization. + """ + return feedback_handler.handle_feedback_memories(feedback_req) diff --git a/src/memos/configs/memory.py b/src/memos/configs/memory.py index 34967849a..04fc58ad6 100644 --- a/src/memos/configs/memory.py +++ b/src/memos/configs/memory.py @@ -7,6 +7,7 @@ from memos.configs.graph_db import GraphDBConfigFactory from memos.configs.internet_retriever import InternetRetrieverConfigFactory from memos.configs.llm import LLMConfigFactory +from memos.configs.mem_reader import MemReaderConfigFactory from memos.configs.reranker import RerankerConfigFactory from memos.configs.vec_db import VectorDBConfigFactory from memos.exceptions import ConfigurationError @@ -240,6 +241,48 @@ class PreferenceTextMemoryConfig(BaseTextMemoryConfig): ) +class MemFeedbackConfig(BaseMemoryConfig): + """Memory feedback configuration class.""" + + extractor_llm: LLMConfigFactory = Field( + ..., + default_factory=LLMConfigFactory, + description="LLM configuration for the memory extractor", + ) + embedder: EmbedderConfigFactory = Field( + ..., + default_factory=EmbedderConfigFactory, + description="Embedder configuration for the memory embedding", + ) + reranker: RerankerConfigFactory | None = Field( + None, + description="Reranker configuration (optional).", + ) + graph_db: GraphDBConfigFactory = Field( + ..., + default_factory=GraphDBConfigFactory, + description="Graph database configuration for the tree-memory storage", + ) + reorganize: bool | None = Field( + False, + description="Optional description for this memory configuration.", + ) + + memory_size: dict[str, Any] | None = Field( + default=None, + description=( + "Maximum item counts per memory bucket, e.g.: " + '{"WorkingMemory": 20, "LongTermMemory": 10000, "UserMemory": 10000}' + ), + ) + + mem_reader: MemReaderConfigFactory = Field( + ..., + default_factory=MemReaderConfigFactory, + description="MemReader configuration for the Feedback", + ) + + # ─── 3. Global Memory Config Factory ────────────────────────────────────────── @@ -259,6 +302,7 @@ class MemoryConfigFactory(BaseConfig): "vllm_kv_cache": KVCacheMemoryConfig, # Use same config as kv_cache "lora": LoRAMemoryConfig, "uninitialized": UninitializedMemoryConfig, + "mem_feedback": MemFeedbackConfig, } @field_validator("backend") diff --git a/src/memos/llms/openai.py b/src/memos/llms/openai.py index 9b348adcf..19d7a60fe 100644 --- a/src/memos/llms/openai.py +++ b/src/memos/llms/openai.py @@ -39,6 +39,7 @@ def generate(self, messages: MessageList, **kwargs) -> str: top_p=kwargs.get("top_p", self.config.top_p), extra_body=kwargs.get("extra_body", self.config.extra_body), tools=kwargs.get("tools", NOT_GIVEN), + timeout=kwargs.get("timeout", 30), ) logger.info(f"Response from OpenAI: {response.model_dump_json()}") tool_calls = getattr(response.choices[0].message, "tool_calls", None) diff --git a/src/memos/mem_feedback/base.py b/src/memos/mem_feedback/base.py new file mode 100644 index 000000000..7b41199d6 --- /dev/null +++ b/src/memos/mem_feedback/base.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod + +from memos.configs.memory import MemFeedbackConfig + + +class BaseMemFeedback(ABC): + """MemFeedback interface class for reading information.""" + + @abstractmethod + def __init__(self, config: MemFeedbackConfig): + """Initialize the MemFeedback with the given configuration.""" + + @abstractmethod + def process_feedback(self, data: dict) -> None: + """Process user's feedback""" diff --git a/src/memos/mem_feedback/feedback.py b/src/memos/mem_feedback/feedback.py new file mode 100644 index 000000000..02b737451 --- /dev/null +++ b/src/memos/mem_feedback/feedback.py @@ -0,0 +1,666 @@ +import concurrent.futures +import difflib +import json + +from datetime import datetime +from typing import TYPE_CHECKING + +from tenacity import retry, stop_after_attempt, wait_exponential + +from memos import log +from memos.configs.memory import MemFeedbackConfig +from memos.context.context import ContextThreadPoolExecutor +from memos.embedders.factory import EmbedderFactory, OllamaEmbedder +from memos.graph_dbs.factory import GraphStoreFactory, PolarDBGraphDB +from memos.llms.factory import AzureLLM, LLMFactory, OllamaLLM, OpenAILLM +from memos.mem_feedback.base import BaseMemFeedback +from memos.mem_reader.factory import MemReaderFactory +from memos.mem_reader.simple_struct import detect_lang +from memos.memories.textual.item import TextualMemoryItem, TreeNodeTextualMemoryMetadata +from memos.memories.textual.tree_text_memory.organize.manager import ( + MemoryManager, + extract_working_binding_ids, +) + + +if TYPE_CHECKING: + from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher +from memos.templates.mem_feedback_prompts import ( + FEEDBACK_ANSWER_PROMPT, + FEEDBACK_ANSWER_PROMPT_ZH, + FEEDBACK_JUDGEMENT_PROMPT, + FEEDBACK_JUDGEMENT_PROMPT_ZH, + UPDATE_FORMER_MEMORIES, + UPDATE_FORMER_MEMORIES_ZH, +) +from memos.types import MessageDict + + +FEEDBACK_PROMPT_DICT = { + "judge": {"en": FEEDBACK_JUDGEMENT_PROMPT, "zh": FEEDBACK_JUDGEMENT_PROMPT_ZH}, + "compare": {"en": UPDATE_FORMER_MEMORIES, "zh": UPDATE_FORMER_MEMORIES_ZH}, + "generation": {"en": FEEDBACK_ANSWER_PROMPT, "zh": FEEDBACK_ANSWER_PROMPT_ZH}, +} + +logger = log.get_logger(__name__) + + +class MemFeedback(BaseMemFeedback): + def __init__(self, config: MemFeedbackConfig): + """ + Initialize the MemFeedback with configuration. + + Args: + config: Configuration object for the MemFeedback + """ + self.config = config + self.llm: OpenAILLM | OllamaLLM | AzureLLM = LLMFactory.from_config(config.extractor_llm) + self.embedder: OllamaEmbedder = EmbedderFactory.from_config(config.embedder) + self.graph_store: PolarDBGraphDB = GraphStoreFactory.from_config(config.graph_db) + self.mem_reader = MemReaderFactory.from_config(config.mem_reader) + + self.is_reorganize = config.reorganize + self.memory_manager: MemoryManager = MemoryManager( + self.graph_store, + self.embedder, + self.llm, + memory_size=config.memory_size + or { + "WorkingMemory": 20, + "LongTermMemory": 1500, + "UserMemory": 480, + }, + is_reorganize=self.is_reorganize, + ) + self.searcher: Searcher = self.memory_manager.searcher + + def _pure_add(self, user_name: str, feedback_content: str, feedback_time: str, info: dict): + """ + Directly add new memory + """ + scene_data = [[{"role": "user", "content": feedback_content, "chat_time": feedback_time}]] + memories = self.mem_reader.get_memory(scene_data, type="chat", info=info) + to_add_memories = [item for scene in memories for item in scene] + added_ids = self._retry_db_operation( + lambda: self.memory_manager.add(to_add_memories, user_name=user_name) + ) + logger.info( + f"[Feedback Core: _pure_add] Added {len(added_ids)} memories for user {user_name}." + ) + return { + "record": { + "add": [ + {"id": _id, "text": added_mem.memory} + for _id, added_mem in zip(added_ids, to_add_memories, strict=False) + ], + "update": [], + } + } + + def _feedback_judgement( + self, chat_history: list[MessageDict], feedback_content: str, feedback_time: str = "" + ) -> dict | None: + """ + Generate a judgement for a given feedback. + """ + lang = detect_lang(feedback_content) + template = FEEDBACK_PROMPT_DICT["judge"][lang] + chat_history_lis = [f"""{msg["role"]}: {msg["content"]}""" for msg in chat_history[-4:]] + chat_history_str = "\n".join(chat_history_lis) + prompt = template.format( + chat_history=chat_history_str, + user_feedback=feedback_content, + feedback_time=feedback_time, + ) + + judge_res = self._get_llm_response(prompt) + if judge_res: + return judge_res + else: + logger.warning( + "[Feedback Core: _feedback_judgement] feedback judgement failed, return []" + ) + return [] + + def _single_add_operation( + self, + old_memory_item: TextualMemoryItem | None, + new_memory_item: TextualMemoryItem, + user_id: str, + user_name: str, + async_mode: str, + ) -> dict: + """ + Individual addition operations + """ + if old_memory_item: + to_add_memory = old_memory_item.model_copy(deep=True) + to_add_memory.metadata.key = new_memory_item.metadata.key + to_add_memory.metadata.tags = new_memory_item.metadata.tags + to_add_memory.memory = new_memory_item.memory + to_add_memory.metadata.embedding = new_memory_item.metadata.embedding + + to_add_memory.metadata.user_id = new_memory_item.metadata.user_id + to_add_memory.metadata.created_at = to_add_memory.metadata.updated_at = ( + datetime.now().isoformat() + ) + to_add_memory.metadata.background = new_memory_item.metadata.background + else: + to_add_memory = new_memory_item.model_copy(deep=True) + to_add_memory.metadata.created_at = to_add_memory.metadata.updated_at = ( + datetime.now().isoformat() + ) + to_add_memory.metadata.background = new_memory_item.metadata.background + + to_add_memory.id = "" + added_ids = self._retry_db_operation( + lambda: self.memory_manager.add([to_add_memory], user_name=user_name, mode=async_mode) + ) + + logger.info(f"[Memory Feedback ADD] {added_ids[0]}") + return {"id": added_ids[0], "text": to_add_memory.memory} + + def _single_update_operation( + self, + old_memory_item: TextualMemoryItem, + new_memory_item: TextualMemoryItem, + user_id: str, + user_name: str, + async_mode: str, + ) -> dict: + """ + Individual update operations + """ + memory_type = old_memory_item.metadata.memory_type + if memory_type == "WorkingMemory": + fields = { + "memory": new_memory_item.memory, + "key": new_memory_item.metadata.key, + "tags": new_memory_item.metadata.tags, + "embedding": new_memory_item.metadata.embedding, + "background": new_memory_item.metadata.background, + "covered_history": old_memory_item.id, + } + self.graph_store.update_node(old_memory_item.id, fields=fields, user_name=user_name) + item_id = old_memory_item.id + else: + done = self._single_add_operation( + old_memory_item, new_memory_item, user_id, user_name, async_mode + ) + item_id = done.get("id") + self.graph_store.update_node( + item_id, {"covered_history": old_memory_item.id}, user_name=user_name + ) + self.graph_store.update_node( + old_memory_item.id, {"status": "archived"}, user_name=user_name + ) + + logger.info( + f"[Memory Feedback UPDATE] New Add:{item_id} | Set archived:{old_memory_item.id} | memory_type: {memory_type}" + ) + + return { + "id": item_id, + "text": new_memory_item.memory, + "archived_id": old_memory_item.id, + "origin_memory": old_memory_item.memory, + } + + def _del_working_binding(self, user_name, mem_items: list[TextualMemoryItem]) -> set[str]: + """Delete working memory bindings""" + bindings_to_delete = extract_working_binding_ids(mem_items) + + logger.info( + f"[Memory Feedback UPDATE] Extracted {len(bindings_to_delete)} working_binding ids to cleanup: {list(bindings_to_delete)}" + ) + + delete_ids = [] + if bindings_to_delete: + delete_ids = list({bindings_to_delete}) + + for mid in delete_ids: + try: + print("del", mid) + self.graph_store.delete_node(mid, user_name=user_name) + + logger.info( + f"[Feedback Core:_del_working_binding] Delete raw/working mem_ids: {delete_ids} for user_name: {user_name}" + ) + except Exception as e: + logger.warning( + f"[Feedback Core:_del_working_binding] TreeTextMemory.delete_hard: failed to delete {mid}: {e}" + ) + + def _feedback_memory( + self, user_id: str, user_name: str, feedback_memories: list[TextualMemoryItem], **kwargs + ) -> dict: + async_mode = kwargs.get("async_mode") + retrieved_memory_ids = kwargs.get("retrieved_memory_ids") or [] + chat_history = kwargs.get("chat_history", []) + feedback_content = kwargs.get("feedback_content", "") + + chat_history_lis = [f"""{msg["role"]}: {msg["content"]}""" for msg in chat_history[-4:]] + fact_history = "\n".join(chat_history_lis) + f"\nuser feedback: \n{feedback_content}" + + retrieved_memories = [ + self.graph_store.get_node(_id, user_name=user_name) for _id in retrieved_memory_ids + ] + filterd_ids = [ + item["id"] for item in retrieved_memories if "mode:fast" in item["metadata"]["tags"] + ] + if filterd_ids: + logger.warning( + f"[Feedback Core: _feedback_memory] Since the tags mode is fast, no modifications are made to the following memory {filterd_ids}." + ) + + current_memories = [ + TextualMemoryItem(**item) + for item in retrieved_memories + if "mode:fast" not in item["metadata"]["tags"] + ] + + def _add_or_update( + memory_item: TextualMemoryItem, + current_memories: list[TextualMemoryItem], + fact_history: str, + ): + if current_memories == []: + current_memories = self._retrieve( + memory_item.memory, info={"user_id": user_id}, user_name=user_name + ) + + if current_memories: + lang = detect_lang("".join(memory_item.memory)) + template = FEEDBACK_PROMPT_DICT["compare"][lang] + current_memories_str = "\n".join( + [f"{item.id}: {item.memory}" for item in current_memories] + ) + prompt = template.format( + current_memories=current_memories_str, + new_facts=memory_item.memory, + chat_history=fact_history, + ) + + operations = self._get_llm_response(prompt).get("operations", []) + operations = self._id_dehallucination(operations, current_memories) + else: + operations = [{"operation": "ADD"}] + + # TODO based on the operation, change memory_item memory info ; change source info + logger.info(f"[Feedback memory operations]: {operations!s}") + + if not operations: + return {"record": {"add": [], "update": []}} + + add_results = [] + update_results = [] + id_to_item = {item.id: item for item in current_memories} + with ContextThreadPoolExecutor(max_workers=10) as executor: + future_to_op = {} + for op in operations: + event_type = op.get("operation", "").lower() + + if event_type == "add": + future = executor.submit( + self._single_add_operation, + None, + memory_item, + user_id, + user_name, + async_mode, + ) + future_to_op[future] = ("add", op) + elif event_type == "update": + future = executor.submit( + self._single_update_operation, + id_to_item[op["id"]], + memory_item, + user_id, + user_name, + async_mode, + ) + future_to_op[future] = ("update", op) + + for future in concurrent.futures.as_completed(future_to_op): + result_type, original_op = future_to_op[future] + try: + result = future.result() + if result_type == "add" and result: + add_results.append(result) + elif result_type == "update" and result: + update_results.append(result) + except Exception as e: + logger.error( + f"[Feedback Core: _add_or_update] Operation failed for {original_op}: {e}", + exc_info=True, + ) + if update_results: + updated_ids = [item["archived_id"] for item in update_results] + self._del_working_binding(updated_ids, user_name) + + return {"record": {"add": add_results, "update": update_results}} + + with ContextThreadPoolExecutor(max_workers=3) as ex: + futures = { + ex.submit(_add_or_update, mem, current_memories, fact_history): i + for i, mem in enumerate(feedback_memories) + } + results = [None] * len(futures) + for fut in concurrent.futures.as_completed(futures): + i = futures[fut] + try: + node = fut.result() + if node: + results[i] = node + except Exception as e: + logger.error( + f"[Feedback Core: _feedback_memory] Error processing memory index {i}: {e}", + exc_info=True, + ) + mem_res = [r for r in results if r] + + return { + "record": { + "add": [element for item in mem_res for element in item["record"]["add"]], + "update": [element for item in mem_res for element in item["record"]["update"]], + } + } + + def _retrieve(self, query: str, info=None, user_name=None): + """Retrieve memory items""" + retrieved_mems = self.searcher.search(query, info=info, user_name=user_name) + return retrieved_mems + + def _vec_query(self, new_memories_embedding: list[float], user_name=None): + """Vector retrieval query""" + retrieved_ids = [] + retrieved_ids.extend( + self.graph_store.search_by_embedding( + new_memories_embedding, + scope="UserMemory", + user_name=user_name, + top_k=10, + threshold=0.2, + ) + ) + retrieved_ids.extend( + self.graph_store.search_by_embedding( + new_memories_embedding, + scope="LongTermMemory", + user_name=user_name, + top_k=10, + threshold=0.2, + ) + ) + current_memories = [ + self.graph_store.get_node(item["id"], user_name=user_name) for item in retrieved_ids + ] + + for item in current_memories: + print(item["id"], item["metadata"]["memory_type"], item["metadata"]["status"]) + if not retrieved_ids: + logger.info( + f"[Feedback Core: _vec_query] No similar memories found for embedding query for user {user_name}." + ) + + filterd_ids = [ + item["id"] for item in current_memories if "mode:fast" in item["metadata"]["tags"] + ] + if filterd_ids: + logger.warning( + f"[Feedback Core: _vec_query] Since the tags mode is fast, no modifications are made to the following memory {filterd_ids}." + ) + return [ + TextualMemoryItem(**item) + for item in current_memories + if "mode:fast" not in item["metadata"]["tags"] + ] + + def _get_llm_response(self, prompt: str, dsl: bool = True) -> dict: + messages = [{"role": "user", "content": prompt}] + try: + response_text = self.llm.generate(messages, temperature=0.3, timeout=60) + if dsl: + response_text = response_text.replace("```", "").replace("json", "") + response_json = json.loads(response_text) + else: + return response_text + except Exception as e: + logger.error(f"[Feedback Core LLM] Exception during chat generation: {e}") + response_json = None + return response_json + + def _id_dehallucination(self, operations, current_memories): + right_ids = [item.id for item in current_memories] + right_lower_map = {x.lower(): x for x in right_ids} + + def correct_item(data): + if data.get("operation", "").lower() != "update": + return data + + original_id = data["id"] + if original_id in right_ids: + return data + + lower_id = original_id.lower() + if lower_id in right_lower_map: + data["id"] = right_lower_map[lower_id] + return data + + matches = difflib.get_close_matches(original_id, right_ids, n=1, cutoff=0.8) + if matches: + data["id"] = matches[0] + return data + + return None + + dehallu_res = [correct_item(item) for item in operations] + return [item for item in dehallu_res if item] + + def _generate_answer( + self, chat_history: list[MessageDict], feedback_content: str, corrected_answer: bool + ) -> str: + """ + Answer generation to facilitate concurrent submission. + """ + if not corrected_answer or feedback_content.strip() == "": + return "" + lang = detect_lang(feedback_content) + template = FEEDBACK_PROMPT_DICT["generation"][lang] + chat_history_str = "\n".join( + [f"{item['role']}: {item['content']}" for item in chat_history] + ) + chat_history_str = chat_history_str if chat_history_str else "none" + prompt = template.format(chat_history=chat_history_str, question=feedback_content) + + return self._get_llm_response(prompt, dsl=False) + + def process_feedback_core( + self, + user_id: str, + user_name: str, + chat_history: list[MessageDict], + feedback_content: str, + **kwargs, + ) -> dict: + """ + Core feedback processing: judgment, memory extraction, addition/update. Return record. + """ + + def check_validity(item): + return ( + "validity" in item + and item["validity"].lower() == "true" + and "corrected_info" in item + and item["corrected_info"].strip() + and "key" in item + and "tags" in item + ) + + try: + feedback_time = kwargs.get("feedback_time") or datetime.now().isoformat() + session_id = kwargs.get("session_id") + if feedback_content.strip() == "": + return {"record": {"add": [], "update": []}} + + info = {"user_id": user_id, "user_name": user_name, "session_id": session_id} + logger.info( + f"[Feedback Core: process_feedback_core] Starting memory feedback process for user {user_name}" + ) + if not chat_history: + return self._pure_add(user_name, feedback_content, feedback_time, info) + + else: + raw_judge = self._feedback_judgement( + chat_history, feedback_content, feedback_time=feedback_time + ) + valid_feedback = ( + [item for item in raw_judge if check_validity(item)] if raw_judge else [] + ) + if ( + raw_judge + and raw_judge[0]["validity"].lower() == "false" + and raw_judge[0]["user_attitude"].lower() == "irrelevant" + ): + return self._pure_add(user_name, feedback_content, feedback_time, info) + + if not valid_feedback: + logger.warning( + f"[Feedback Core: process_feedback_core] No valid judgements for user {user_name}: {raw_judge}." + ) + return {"record": {"add": [], "update": []}} + + feedback_memories = [] + + corrected_infos = [item["corrected_info"] for item in valid_feedback] + embed_bs = 5 + feedback_memories_embeddings = [] + for i in range(0, len(corrected_infos), embed_bs): + batch = corrected_infos[i : i + embed_bs] + try: + feedback_memories_embeddings.extend(self.embedder.embed(batch)) + except Exception as e: + logger.error( + f"[Feedback Core: process_feedback_core] Embedding batch failed: {e}", + exc_info=True, + ) + + for item, embedding in zip( + valid_feedback, feedback_memories_embeddings, strict=False + ): + value = item["corrected_info"] + key = item["key"] + tags = item["tags"] + feedback_memories.append( + TextualMemoryItem( + memory=value, + metadata=TreeNodeTextualMemoryMetadata( + user_id=info.get("user_id", ""), + session_id=info.get("session_id", ""), + memory_type="LongTermMemory", + status="activated", + tags=tags, + key=key, + embedding=embedding, + usage=[], + sources=[{"type": "chat"}], + user_name=user_name, + background="[Feedback update background]: " + + str(chat_history) + + "\nUser feedback: " + + str(feedback_content), + confidence=0.99, + type="fine", + ), + ) + ) + + mem_record = self._feedback_memory( + user_id, + user_name, + feedback_memories, + chat_history=chat_history, + feedback_content=feedback_content, + **kwargs, + ) + logger.info( + f"[Feedback Core: process_feedback_core] Processed {len(feedback_memories)} feedback memories for user {user_name}." + ) + return mem_record + + except Exception as e: + logger.error(f"[Feedback Core: process_feedback_core] Error for user {user_name}: {e}") + return {"record": {"add": [], "update": []}} + + def process_feedback( + self, + user_id: str, + user_name: str, + chat_history: list[MessageDict], + feedback_content: str, + **kwargs, + ): + """ + Process feedback with different modes. + + Args: + user_name: cube_ids + chat_history: List of chat messages + feedback_content: Feedback content from user + **kwargs: Additional arguments including async_mode + + Returns: + Dict with answer and/or memory operation records + """ + corrected_answer = kwargs.get("corrected_answer", False) + + with ContextThreadPoolExecutor(max_workers=2) as ex: + answer_future = ex.submit( + self._generate_answer, + chat_history, + feedback_content, + corrected_answer=corrected_answer, + ) + core_future = ex.submit( + self.process_feedback_core, + user_id, + user_name, + chat_history, + feedback_content, + **kwargs, + ) + done, pending = concurrent.futures.wait([answer_future, core_future], timeout=30) + for fut in pending: + fut.cancel() + try: + answer = answer_future.result() + record = core_future.result() + task_id = kwargs.get("task_id", "default") + + logger.info( + f"[MemFeedback process] Feedback Completed : user {user_name} | task_id {task_id} | record {record}." + ) + + return {"answer": answer, "record": record["record"]} + except concurrent.futures.TimeoutError: + logger.error( + f"[MemFeedback process] Timeout in sync mode for {user_name}", exc_info=True + ) + return {"answer": "", "record": {"add": [], "update": []}} + except Exception as e: + logger.error( + f"[MemFeedback process] Error in concurrent tasks for {user_name}: {e}", + exc_info=True, + ) + return {"answer": "", "record": {"add": [], "update": []}} + + # Helper for DB operations with retry + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) + def _retry_db_operation(self, operation): + try: + return operation() + except Exception as e: + logger.error( + f"[MemFeedback: _retry_db_operation] DB operation failed: {e}", exc_info=True + ) + raise diff --git a/src/memos/mem_feedback/simple_feedback.py b/src/memos/mem_feedback/simple_feedback.py new file mode 100644 index 000000000..01132eb97 --- /dev/null +++ b/src/memos/mem_feedback/simple_feedback.py @@ -0,0 +1,29 @@ +from memos import log +from memos.embedders.factory import OllamaEmbedder +from memos.graph_dbs.factory import PolarDBGraphDB +from memos.llms.factory import AzureLLM, OllamaLLM, OpenAILLM +from memos.mem_feedback.feedback import MemFeedback +from memos.mem_reader.simple_struct import SimpleStructMemReader +from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager +from memos.memories.textual.tree_text_memory.retrieve.searcher import Searcher + + +logger = log.get_logger(__name__) + + +class SimpleMemFeedback(MemFeedback): + def __init__( + self, + llm: OpenAILLM | OllamaLLM | AzureLLM, + embedder: OllamaEmbedder, + graph_store: PolarDBGraphDB, + memory_manager: MemoryManager, + mem_reader: SimpleStructMemReader, + searcher: Searcher, + ): + self.llm = llm + self.embedder = embedder + self.graph_store = graph_store + self.memory_manager = memory_manager + self.mem_reader = mem_reader + self.searcher = searcher diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 6f4bf1b88..ed81eeffa 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -158,6 +158,7 @@ def init_mem_cube( self, mem_cube: BaseMemCube, searcher: Searcher | None = None, + feedback_server: Searcher | None = None, ): self.mem_cube = mem_cube self.text_mem: TreeTextMemory = self.mem_cube.text_mem @@ -170,6 +171,7 @@ def init_mem_cube( ) else: self.searcher = searcher + self.feedback_server = feedback_server def initialize_modules( self, diff --git a/src/memos/mem_scheduler/general_scheduler.py b/src/memos/mem_scheduler/general_scheduler.py index 3e3298b10..df843e496 100644 --- a/src/memos/mem_scheduler/general_scheduler.py +++ b/src/memos/mem_scheduler/general_scheduler.py @@ -14,6 +14,7 @@ ANSWER_LABEL, DEFAULT_MAX_QUERY_KEY_WORDS, LONG_TERM_MEMORY_TYPE, + MEM_FEEDBACK_LABEL, MEM_ORGANIZE_LABEL, MEM_READ_LABEL, NOT_APPLICABLE_TYPE, @@ -56,6 +57,7 @@ def __init__(self, config: GeneralSchedulerConfig): MEM_READ_LABEL: self._mem_read_message_consumer, MEM_ORGANIZE_LABEL: self._mem_reorganize_message_consumer, PREF_ADD_LABEL: self._pref_add_message_consumer, + MEM_FEEDBACK_LABEL: self._mem_feedback_message_consumer, } self.dispatcher.register_handlers(handlers) @@ -473,6 +475,64 @@ def _add_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: except Exception as e: logger.error(f"Error: {e}", exc_info=True) + def _mem_feedback_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: + try: + message = messages[0] + mem_cube = self.current_mem_cube + + user_id = message.user_id + mem_cube_id = message.mem_cube_id + content = message.content + + feedback_data = json.loads(content) + + feedback_result = self.feedback_server.process_feedback( + user_id=user_id, + user_name=mem_cube_id, + session_id=feedback_data["session_id"], + chat_history=feedback_data["history"], + retrieved_memory_ids=feedback_data["retrieved_memory_ids"], + feedback_content=feedback_data["feedback_content"], + feedback_time=feedback_data["feedback_time"], + task_id=feedback_data["task_id"], + ) + + logger.info( + f"Successfully feedback memories for user_id={user_id}, mem_cube_id={mem_cube_id}" + ) + + should_send_log = ( + self.rabbitmq_config is not None + and hasattr(self.rabbitmq_config, "exchange_type") + and self.rabbitmq_config.exchange_type == "direct" + ) + if feedback_result and should_send_log: + feedback_content = [] + for _i, mem_item in enumerate(feedback_result): + feedback_content.append( + { + "content": mem_item.memory, + "id": mem_item["id"], + } + ) + event = self.create_event_log( + label="feedbackMemory", + from_memory_type=USER_INPUT_TYPE, + to_memory_type=LONG_TERM_MEMORY_TYPE, + user_id=user_id, + mem_cube_id=mem_cube_id, + mem_cube=mem_cube, + memcube_log_content=feedback_content, + metadata=[], + memory_len=len(feedback_content), + memcube_name=self._map_memcube_name(mem_cube_id), + ) + event.task_id = message.task_id + self._submit_web_logs([event]) + + except Exception as e: + logger.error(f"Error processing feedbackMemory message: {e}", exc_info=True) + def _mem_read_message_consumer(self, messages: list[ScheduleMessageItem]) -> None: logger.info(f"Messages {messages} assigned to {MEM_READ_LABEL} handler.") diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index 91d442720..e76728286 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -13,6 +13,7 @@ MEM_ARCHIVE_LABEL = "mem_archive" API_MIX_SEARCH_LABEL = "api_mix_search" PREF_ADD_LABEL = "pref_add" +MEM_FEEDBACK_LABEL = "mem_feedback" TreeTextMemory_SEARCH_METHOD = "tree_text_memory_search" TreeTextMemory_FINE_SEARCH_METHOD = "tree_text_memory_fine_search" diff --git a/src/memos/multi_mem_cube/composite_cube.py b/src/memos/multi_mem_cube/composite_cube.py index 8f892d60d..6db6ca3d7 100644 --- a/src/memos/multi_mem_cube/composite_cube.py +++ b/src/memos/multi_mem_cube/composite_cube.py @@ -7,7 +7,7 @@ if TYPE_CHECKING: - from memos.api.product_models import APIADDRequest, APISearchRequest + from memos.api.product_models import APIADDRequest, APIFeedbackRequest, APISearchRequest from memos.multi_mem_cube.single_cube import SingleCubeView @@ -61,3 +61,13 @@ def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]: merged_results["pref_note"] = note return merged_results + + def feedback_memories(self, feedback_req: APIFeedbackRequest) -> list[dict[str, Any]]: + all_results: list[dict[str, Any]] = [] + + for view in self.cube_views: + self.logger.info(f"[CompositeCubeView] fan-out add to cube={view.cube_id}") + results = view.feedback_memories(feedback_req) + all_results.extend(results) + + return all_results diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 880646939..cc577f1bd 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -16,6 +16,7 @@ from memos.log import get_logger from memos.mem_scheduler.schemas.general_schemas import ( ADD_LABEL, + MEM_FEEDBACK_LABEL, MEM_READ_LABEL, PREF_ADD_LABEL, ) @@ -34,7 +35,7 @@ if TYPE_CHECKING: - from memos.api.product_models import APIADDRequest, APISearchRequest + from memos.api.product_models import APIADDRequest, APIFeedbackRequest, APISearchRequest from memos.mem_cube.navie import NaiveMemCube from memos.mem_reader.simple_struct import SimpleStructMemReader from memos.mem_scheduler.optimized_scheduler import OptimizedScheduler @@ -48,6 +49,7 @@ class SingleCubeView(MemCubeView): mem_scheduler: OptimizedScheduler logger: Any searcher: Any + feedback_server: Any | None = None deepsearch_agent: Any | None = None def add_memories(self, add_req: APIADDRequest) -> list[dict[str, Any]]: @@ -134,6 +136,47 @@ def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]: self.logger.info(f"Search memories result: {memories_result}") return memories_result + def feedback_memories(self, feedback_req: APIFeedbackRequest) -> dict[str, Any]: + target_session_id = feedback_req.session_id or "default_session" + if feedback_req.async_mode == "async": + try: + feedback_req_str = json.dumps(feedback_req.model_dump()) + message_item_feedback = ScheduleMessageItem( + user_id=feedback_req.user_id, + task_id=feedback_req.task_id, + session_id=target_session_id, + mem_cube_id=self.cube_id, + mem_cube=self.naive_mem_cube, + label=MEM_FEEDBACK_LABEL, + content=feedback_req_str, + timestamp=datetime.utcnow(), + ) + self.mem_scheduler.memos_message_queue.submit_messages( + messages=[message_item_feedback] + ) + self.logger.info(f"[SingleCubeView] cube={self.cube_id} Submitted FEEDBACK async") + except Exception as e: + self.logger.error( + f"[SingleCubeView] cube={self.cube_id} Failed to submit FEEDBACK: {e}", + exc_info=True, + ) + return [] + else: + feedback_result = self.feedback_server.process_feedback( + user_id=feedback_req.user_id, + user_name=self.cube_id, + session_id=feedback_req.session_id, + chat_history=feedback_req.history, + retrieved_memory_ids=feedback_req.retrieved_memory_ids, + feedback_content=feedback_req.feedback_content, + feedback_time=feedback_req.feedback_time, + async_mode=feedback_req.async_mode, + corrected_answer=feedback_req.corrected_answer, + task_id=feedback_req.task_id, + ) + self.logger.info(f"Feedback memories result: {feedback_result}") + return feedback_result + def _get_search_mode(self, mode: str) -> str: """ Get search mode with environment variable fallback. diff --git a/src/memos/multi_mem_cube/views.py b/src/memos/multi_mem_cube/views.py index baf5e80e1..7247a0328 100644 --- a/src/memos/multi_mem_cube/views.py +++ b/src/memos/multi_mem_cube/views.py @@ -4,7 +4,7 @@ if TYPE_CHECKING: - from memos.api.product_models import APIADDRequest, APISearchRequest + from memos.api.product_models import APIADDRequest, APIFeedbackRequest, APISearchRequest class MemCubeView(Protocol): @@ -39,3 +39,16 @@ def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]: - cube_id """ ... + + def feedback_memories(self, feedback_req: APIFeedbackRequest) -> dict[str, Any]: + """ + Process feedback_req, read memories from one or more cubes and feedback them. + + Returns: + A list of memory dicts, each item should at least contain: + - memory + - memory_id + - memory_type + - cube_id + """ + ... diff --git a/src/memos/templates/mem_feedback_prompts.py b/src/memos/templates/mem_feedback_prompts.py new file mode 100644 index 000000000..f7f2e8cb4 --- /dev/null +++ b/src/memos/templates/mem_feedback_prompts.py @@ -0,0 +1,541 @@ +FEEDBACK_JUDGEMENT_PROMPT = """You are a answer quality analysis expert. Please strictly follow the steps and criteria below to analyze the provided "User and Assistant Chat History" and "User Feedback," and fill the final evaluation results into the specified JSON format. + +Analysis Steps and Criteria: +1. *Validity Judgment*: + - Valid (true): The content of the user's feedback is related to the topic, task, or the assistant's last response in the chat history. For example: asking follow-up questions, making corrections, providing supplements, or evaluating the last response. + - Invalid (false): The user's feedback is entirely unrelated to the conversation history, with no semantic, topical, or lexical connection to any prior content. + +2. *User Attitude Judgment*: + - Dissatisfied: The feedback shows negative emotions, such as directly pointing out errors, expressing confusion, complaining, criticizing, or explicitly stating that the problem remains unsolved. + - Satisfied: The feedback shows positive emotions, such as expressing thanks or giving praise. + - Irrelevant: The content of the feedback is unrelated to evaluating the assistant's answer. + +3. *Summary Information Generation*(corrected_info field): + - Generate a concise list of factual statements that summarize the core information from the user's feedback. + - When the feedback provides corrections, focus only on the corrected information. + - When the feedback provides supplements, integrate all valid information (both old and new). + - It is very important to keep any relevant time information and express time information as concrete, unambiguous date(s) or period(s) (e.g., "March 2023", "2024-07", or "May–June 2022"). + - For 'satisfied' attitude, this list may contain confirming statements or be empty if no new facts are provided. + - Focus on statement of objective facts. For example: "The user completed the Everest Circuit trek with colleagues in March 2023." + +Output Format: +[ + {{ + "validity": "", + "user_attitude": "", + "corrected_info": "", + "key": "", + "tags": "" + }} +] + +Example1: +Dialogue History: +user: I can't eat spicy food these days. Can you recommend some suitable restaurants for me? +assistant: Sure, I recommend the Fish Restaurant near you. Their signature dishes include various types of steamed seafood and sashimi of sea fish. +feedback time: 2023-1-18T14:25:00.856481 + +User Feedback: +Oh,No!I'm allergic to seafood!And I don't like eating raw fish. + +Output: +[ + {{ + "validity": "true", + "user_attitude": "dissatisfied", + "corrected_info": "User is allergic to seafood and does not like eating raw fish.", + "key": "dietary restrictions", + "tags": ["allergic", "seafood", "raw fish", "food preference"] + }} +] + +Example2: +Dialogue History: +user: When did I bought on November 25, 2025? +assistant: A red coat +feedback time: 2025-11-28T20:45:00.875249 + +User Feedback: +No, I also bought a blue shirt. + +Output: +[ + {{ + "validity": "true", + "user_attitude": "dissatisfied", + "corrected_info": "User bought a red coat and a blue shirt on November 25, 2025", + "key": "shopping record", + "tags": ["purchase", "clothing", "shopping"] + }} +] + +Example3: +Dialogue History: +user: What's my favorite food? +assistant: Pizza and sushi +feedback time: 2024-07-15T10:30:00.000000 + +User Feedback: +Wrong! I hate sushi. I like burgers. + +Output: +[ + {{ + "validity": "true", + "user_attitude": "dissatisfied", + "corrected_info": "User likes pizza and burgers, but hates sushi.", + "key": "food preferences", + "tags": ["food preferences", "pizza", "burgers", "sushi"] + }} +] + +Dialogue History: +{chat_history} + +feedback time: {feedback_time} + +User Feedback: +{user_feedback} + +Output:""" + +FEEDBACK_JUDGEMENT_PROMPT_ZH = """您是一个回答质量分析专家。请严格按照以下步骤和标准分析提供的"用户与助手聊天历史"和"用户反馈",并将最终评估结果填入指定的JSON格式中。 + +分析步骤和标准: +1. *有效性判断*:(validity字段) + - 有效(true):用户反馈的内容与聊天历史中的主题、任务或助手的最后回复相关。例如:提出后续问题、进行纠正、提供补充或评估最后回复。 + - 无效(false):用户反馈与对话历史完全无关,与之前内容没有任何语义、主题或词汇联系。 + +2. *用户态度判断*:(user_attitude字段) + - 不满意:反馈显示负面情绪,如直接指出错误、表达困惑、抱怨、批评,或明确表示问题未解决。 + - 满意:反馈显示正面情绪,如表达感谢或给予赞扬。 + - 无关:反馈内容与评估助手回答无关。 + +3. *摘要信息生成*(corrected_info字段): + - 从用户反馈中总结核心信息,生成简洁的事实陈述列表。 + - 当反馈提供纠正时,仅关注纠正后的信息。 + - 当反馈提供补充时,整合所有有效信息(包括旧信息和新信息)。 + - 非常重要:保留相关时间信息,并以具体、明确的日期或时间段表达(例如:"2023年3月"、"2024年7月"或"2022年5月至6月")。 + - 对于"满意"态度,此列表可能包含确认性陈述,如果没有提供新事实则为空。 + - 专注于客观事实陈述。例如:"用户于2023年3月与同事完成了珠峰环线徒步。" + +输出格式: +[ + {{ + "validity": "<字符串,'true' 或 'false'>", + "user_attitude": "<字符串,'dissatisfied' 或 'satisfied' 或 'irrelevant'>", + "corrected_info": "<字符串,用中文书写的事实信息记录>", + "key": "<字符串,简洁的中文记忆标题,用于快速识别该条目的核心内容(2-5个汉字)>", + "tags": "<列表,中文关键词列表(每个标签1-3个汉字),用于分类和检索>" + }} +] + +示例1: +对话历史: +用户:这些天我不能吃辣。能给我推荐一些合适的餐厅吗? +助手:好的,我推荐您附近的鱼类餐厅。他们的招牌菜包括各种蒸海鲜和海鱼生鱼片。 +反馈时间:2023-1-18T14:25:00.856481 + +用户反馈: +哦,不!我对海鲜过敏!而且我不喜欢吃生鱼。 + +输出: +[ + {{ + "validity": "true", + "user_attitude": "dissatisfied", + "corrected_info": "用户对海鲜过敏且不喜欢吃生鱼", + "key": "饮食限制", + "tags": ["过敏", "海鲜", "生鱼", "饮食偏好"] + }} +] + +示例2: +对话历史: +用户:我2025年11月25日买了什么? +助手:一件红色外套 +反馈时间:2025-11-28T20:45:00.875249 + +用户反馈: +不对,我还买了一件蓝色衬衫。 + +输出: +[ + {{ + "validity": "true", + "user_attitude": "dissatisfied", + "corrected_info": "用户于2025年11月25日购买了一件红色外套和一件蓝色衬衫", + "key": "购物记录", + "tags": ["红色外套", "蓝色衬衫", "服装购物"] + }} +] + +示例3: +对话历史: +用户:我最喜欢的食物是什么? +助手:披萨和寿司 +反馈时间:2024-07-15T10:30:00.000000 + +用户反馈: +错了!我讨厌寿司。我喜欢汉堡。 + +输出: +[ + {{ + "validity": "true", + "user_attitude": "dissatisfied", + "corrected_info": "用户喜欢披萨和汉堡,但讨厌寿司", + "key": "食物偏好", + "tags": ["偏好", "披萨和汉堡"] + }} +] + +对话历史: +{chat_history} + +反馈时间:{feedback_time} + +用户反馈: +{user_feedback} + +输出:""" + +UPDATE_FORMER_MEMORIES = """Operation recommendations: +Please analyze the newly acquired factual information and determine how this information should be updated to the memory database: add, update, or keep unchanged, and provide final operation recommendations. +You must strictly return the response in the following JSON format: + +{{ + "operations": + [ + {{ + "id": "", + "text": "", + "operation": "", + "old_memory": "" + }}, + ... + ] +}} + +*Requirements*: +1. If the new fact does not provide additional information to the existing memory item, the existing memory can override the new fact, and the operation is set to "NONE." +2. If the new fact is similar to existing memory but the information is more accurate, complete, or requires correction, set operation to "UPDATE" +3. If the new fact contradicts existing memory in key information (such as time, location, status, etc.), update the original memory based on the new fact and set operation to "UPDATE", only modifying the relevant error segments in the existing memory paragraphs while keeping other text completely unchanged. +4. If there is no existing memory that requires updating, the new fact is added as entirely new information, and the operation is set to "ADD." Therefore, in the same operation list, ADD and UPDATE will not coexist. + +*ID Management Rules*: +- Update operation: Keep the original ID unchanged +- Add operation: Generate a new unique ID in the format of a 4-digit string (e.g., "0001", "0002", etc.) + +*Important Requirements*: +1. For "UPDATE" operations, you must provide the old_memory field to display the original content +2. Compare existing memories one by one and do not omit any content requiring updates. When multiple existing memories need updating, include all relevant entries in the operation list +3. "text" field requirements: + - Use concise, complete declarative sentences, avoiding redundant information + - "text" should record the final adopted memory: if judged as "ADD", output text as "new fact"; if judged as "UPDATE", output text as "adjusted new fact"; if judged as "NONE", output text as "existing memory" + - When updating, ensure that only the related error segments are modified, and other text remains completely unchanged. +4. Both text and old_memory content should be in English +5. Return only the JSON format response, without any other content + + + +Example1: +Current Memories: +"0911": "The user is a senior full-stack developer working at Company B" +"123": "The user works as a software engineer at Company A. And he has a good relationship with his wife." +"648": "The user is responsible for front-end development of software at Company A" +"7210": "The user is responsible for front-end development of software at Company A" +"908": "The user enjoys fishing with friends on weekends" + +The background of the new fact being put forward: +user: Do you remember where I work? +assistant: Company A. +user feedback: I work at Company B, and I am a senior full-stack developer. + +Newly facts: +The user works as a senior full-stack developer at Company B + +Operation recommendations: +{{ + "operations": + [ + {{ + "id": "0911", + "text": "The user is a senior full-stack developer working at Company B", + "operation": "NONE" + }}, + {{ + "id": "123", + "text": "The user works as a senior full-stack developer at Company B. And he has a good relationship with his wife.", + "operation": "UPDATE", + "old_memory": "The user works as a software engineer at Company A. And he has a good relationship with his wife." + }}, + {{ + "id": "648", + "text": "The user works as a senior full-stack developer at Company B", + "operation": "UPDATE", + "old_memory": "The user is responsible for front-end development of software at Company A" + }}, + {{ + "id": "7210", + "text": "The user works as a senior full-stack developer at Company B", + "operation": "UPDATE", + "old_memory": "The user is responsible for front-end development of software at Company A" + }}, + {{ + "id": "908", + "text": "The user enjoys fishing with friends on weekends", + "operation": "NONE" + }} + ] +}} + +Example2: +Current Memories: +"123": "The user works as a software engineer in Company A, mainly responsible for front-end development" +"908": "The user likes to go fishing with friends on weekends" + +The background of the new fact being put forward: +user: Guess where I live? +assistant: Hehuan Community. +user feedback: Wrong, update my address: Mingyue Community, Chaoyang District, Beijing + +Newly facts: +"The user's residential address is Mingyue Community, Chaoyang District, Beijing" + +Operation recommendations: +{{ + "operations": + [ + {{ + "id": "123", + "text": "The user works as a software engineer at Company A, primarily responsible for front-end development", + "operation": "NONE" + }}, + {{ + "id": "908", + "text": "The user enjoys fishing with friends on weekends", + "operation": "NONE" + }}, + {{ + "id": "4567", + "text": "The user's residential address is Mingyue Community, Chaoyang District, Beijing", + "operation": "ADD" + }} + ] +}} + + +**Current Memories** +{current_memories} + +**The background of the new fact being put forward** +{chat_history} + +**Newly facts** +{new_facts} + +Operation recommendations: +""" + +UPDATE_FORMER_MEMORIES_ZH = """请分析新获取的事实信息,并决定这些信息应该如何更新到记忆库中:新增、更新、或保持不变,并给出最终的操作建议。 + +你必须严格按照以下JSON格式返回响应: + +{{ + "operations": + [ + {{ + "id": "<记忆ID>", + "text": "<记忆内容>", + "operation": "<操作类型,必须是 "ADD", "UPDATE", "NONE" 之一>", + "old_memory": "<原记忆内容,仅当操作为"UPDATE"时需要提供>" + }}, + ... + ] +}} + +要求: +1. 若新事实未对现有记忆条目提供额外信息,现有记忆可覆盖新事实,操作设为"NONE" +2. 若新事实与现有记忆相似但信息更准确、完整或需修正,操作设为"UPDATE" +3. 若新事实在关键信息(如时间、地点、状态等)上与现有记忆矛盾,则根据新事实更新原记忆,操作设为"UPDATE",仅修改现有记忆段落中的相关错误片段,其余文本完全保持不变 +4. 若无需要更新的现有记忆,则将新事实作为全新信息添加,操作设为"ADD"。因此在同一操作列表中,ADD与UPDATE不会同时存在 + +ID管理规则: +- 更新操作:保持原有ID不变 +- 新增操作:生成新的唯一ID,格式为4位数字字符串(如:"0001", "0002"等) + +重要要求: +1. 对于"UPDATE"更新操作,必须提供old_memory字段显示原内容 +2. 对现有记忆逐一比对,不可漏掉需要更新的内容。当多个现有记忆需要更新时,将所有的相关条目都包含在操作列表中 +3. text字段要求: + - 使用简洁、完整的陈述句,避免冗余信息 + - text要记录最终采用的记忆,如果判为"ADD",则text输出为"新事实";如果判为"UPDATE",则text输出为"调整后的新事实";如果判为"NONE",则text输出为"现有记忆" + - 更新时确保仅修改相关错误片段,其余文本完全保持不变 +4. text和old_memory内容使用中文 +5. 只返回JSON格式的响应,不要包含其他任何内容 + + +示例1: +当前记忆: +"0911": "用户是高级全栈开发工程师,在B公司工作" +"123": "用户在公司A担任软件工程师。而且用户和同事们的关系很好,他们共同协作大项目。" +"648": "用户在公司A负责软件的前端开发工作" +"7210": "用户在公司A负责软件的前端开发工作" +"908": "用户周末喜欢和朋友一起钓鱼" + + +提出新事实的背景: +user: 你还记得我现在在哪里工作吗? +assistant: A公司 +user feedback: 实际上,我在公司B工作,是一名高级全栈开发人员。 + + +新获取的事实: +"用户现在在公司B担任高级全栈开发工程师" + +操作建议: +{{ + "operations": + [ + {{ + "id": "0911", + "text": "用户是高级全栈开发工程师,在B公司工作", + "operation": "NONE" + }}, + {{ + "id": "123", + "text": "用户现在在公司B担任高级全栈开发工程师。而且用户和同事们的关系很好,他们共同协作大项目。", + "operation": "UPDATE", + "old_memory": "用户在公司A担任软件工程师,主要负责前端开发。而且用户和同事们的关系很好,他们共同协作大项目。" + }}, + {{ + "id": "648", + "text": "用户现在在公司B担任高级全栈开发工程师", + "operation": "UPDATE", + "old_memory": "用户在公司A负责软件的前端开发工作" + }}, + {{ + "id": "7210", + "text": "用户现在在公司B担任高级全栈开发工程师", + "operation": "UPDATE", + "old_memory": "用户在公司A负责软件的前端开发工作" + }}, + {{ + "id": "908", + "text": "用户周末喜欢和朋友一起钓鱼", + "operation": "NONE" + }} + ] +}} + +示例2: +当前记忆: +"123": "用户在公司A担任软件工程师,主要负责前端开发" +"908": "用户周末喜欢和朋友一起钓鱼" + + +提出新事实的背景: +user: 猜猜我住在哪里? +assistant: 合欢社区 +user feedback: 错了,请更新我的地址:北京市朝阳区明月社区 + +新获取的事实: +"用户的居住地址是北京市朝阳区明月小区" + +操作建议: +{{ + "operations": + [ + {{ + "id": "123", + "text": "用户在公司A担任软件工程师,主要负责前端开发", + "operation": "NONE" + }}, + {{ + "id": "908", + "text": "用户周末喜欢和朋友一起钓鱼", + "operation": "NONE" + }}, + {{ + "id": "4567", + "text": "用户的居住地址是北京市朝阳区明月小区", + "operation": "ADD" + }} + ] +}} + +**当前记忆:** +{current_memories} + +**新事实提出的背景:** +{chat_history} + +**新事实:** +{new_facts} + +操作建议: +""" + + +FEEDBACK_ANSWER_PROMPT = """ +You are a knowledgeable and helpful AI assistant.You have access to the history of the current conversation. This history contains the previous exchanges between you and the user. + +# INSTRUCTIONS: +1. Carefully analyze the entire conversation history. Your answer must be based only on the information that has been exchanged within this dialogue. +2. Pay close attention to the sequence of the conversation. If the user refers back to a previous statement (e.g., "the thing I mentioned earlier"), you must identify that specific point in the history. +3. Your primary goal is to provide continuity and context from this specific conversation. Do not introduce new facts or topics that have not been previously discussed. +4. If current question is ambiguous, use the conversation history to clarify its meaning. + +# APPROACH (Think step by step): +1. Review the conversation history to understand the context and topics that have been discussed. +2. Identify any specific details, preferences, or statements the user has made that are relevant to the current question. +3. Formulate a precise, concise answer that is a direct continuation of the existing dialogue. +4. Ensure your final answer is grounded in the conversation history and directly addresses the user's latest query in that context. + +# Tip: +If no chat history is provided: + - Treat the query as self-contained. + - Do not assume prior context. + - Respond based solely on the current question. + - Do not raise new questions during the answering process. + +Chat history: +{chat_history} + +Question: +{question} + +Answer: +""" + +FEEDBACK_ANSWER_PROMPT_ZH = """ +你是一个知识渊博且乐于助人的AI助手。你可以访问当前对话的完整历史记录。这些记录包含你与用户之间先前的所有交流内容。 + +# 指令: +1. 仔细分析整个对话历史。你的回答必须仅基于本次对话中已交流的信息。 +2. 密切关注对话的先后顺序。如果用户提及之前的发言(例如“我之前提到的那件事”),你必须定位到历史记录中的具体内容。 +3. 你的主要目标是基于本次特定对话提供连续性和上下文。不要引入之前对话中未讨论过的新事实或话题。 +4. 如果用户当前的问题含义不明确,请利用对话历史来澄清其意图。 + +# 处理方法(逐步思考): +1. 回顾对话历史,以理解已讨论的背景和主题。 +2. 识别用户已提及的、与当前问题相关的任何具体细节、偏好或陈述。 +3. 构思一个精准、简洁的回答,使其成为现有对话的直接延续。 +4. 确保你的最终回答紧扣对话历史,并在此上下文中直接回应用户的最新提问。 + +# 注意: +如果没有提供聊天历史记录: + - 将该查询视为独立的。 + - 不要假设之前存在背景信息。 + - 仅根据当前问题进行回答。 + - 在回答过程中不必提出新的问题。 + +对话历史: +{chat_history} + +问题: +{question} + +回答: +""" diff --git a/tests/api/test_server_router.py b/tests/api/test_server_router.py index 7c4b4be9d..5906697d9 100644 --- a/tests/api/test_server_router.py +++ b/tests/api/test_server_router.py @@ -38,6 +38,7 @@ def mock_init_server(): "default_cube_config": Mock(), "mos_server": Mock(), "mem_scheduler": Mock(), + "feedback_server": Mock(), "naive_mem_cube": Mock(), "searcher": Mock(), "api_module": Mock(),