From 32563e1f032d4a26a84f630200153347f3d00c80 Mon Sep 17 00:00:00 2001 From: fridayL Date: Tue, 9 Sep 2025 15:42:59 +0800 Subject: [PATCH 01/10] fix: add memory --- src/memos/reranker/concat.py | 42 ++++++++++++++++++++++++++++++++++ src/memos/reranker/http_bge.py | 18 ++++++++++++--- 2 files changed, 57 insertions(+), 3 deletions(-) create mode 100644 src/memos/reranker/concat.py diff --git a/src/memos/reranker/concat.py b/src/memos/reranker/concat.py new file mode 100644 index 00000000..d7f48cc5 --- /dev/null +++ b/src/memos/reranker/concat.py @@ -0,0 +1,42 @@ +from typing import List, Dict, Any, Union, Tuple + + +def process_source(items: List[Tuple[Any, Union[str, Dict[str, Any], List[Any]]]] | None = None) -> str: + """ + Args: + items: List of tuples where each tuple contains (memory, source). + source can be str, Dict, or List. + Returns: + str: Concatenated source. + """ + concat_data = [] + for item in items: + memory, source = item + print(memory, source) + if isinstance(source, str): + concat_data.append(source) + return "\n".join(concat_data) + + +def concat_original_source( + graph_results: list, + merge_field: List[str]=["sources"], +) -> list[str]: + """ + Merge memory items with original dialogue. + Args: + graph_results (list[TextualMemoryItem]): List of memory items with embeddings. + merge_field (List[str]): List of fields to merge. + Returns: + list[str]: List of memory and concat orginal memory. + """ + documents = [] + for item in graph_results: + memory = getattr(item, "memory", "") + sources = [] + for field in merge_field: + source = getattr(item.metadata, field, "") + sources.append((memory, source)) + concat_string = process_source(sources) + documents.append(concat_string) + return documents \ No newline at end of file diff --git a/src/memos/reranker/http_bge.py b/src/memos/reranker/http_bge.py index 08ff295a..2de44abc 100644 --- a/src/memos/reranker/http_bge.py +++ b/src/memos/reranker/http_bge.py @@ -6,6 +6,10 @@ import requests from .base import BaseReranker +from .concat import concat_original_source +from memos.log import get_logger + +logger = get_logger(__name__) if TYPE_CHECKING: @@ -24,6 +28,7 @@ def __init__( model: str = "bge-reranker-v2-m3", timeout: int = 10, headers_extra: dict | None = None, + concat_source: List[str]=["sources"], ): if not reranker_url: raise ValueError("reranker_url must not be empty") @@ -32,6 +37,7 @@ def __init__( self.model = model self.timeout = timeout self.headers_extra = headers_extra or {} + self.concat_source = concat_source def rerank( self, @@ -43,8 +49,14 @@ def rerank( if not graph_results: return [] - documents = [getattr(item, "memory", None) for item in graph_results] - documents = [d for d in documents if isinstance(d, str) and d] + documents = [] + if self.concat_source: + documents = concat_original_source(graph_results) + else: + documents = [getattr(item, "memory", None) for item in graph_results] + documents = [d for d in documents if isinstance(d, str) and d] + + logger.info(f"[HTTPBGERerankerSample] query: {query} , documents: {documents[:5]}...") if not documents: return [] @@ -88,5 +100,5 @@ def rerank( return [(item, 0.0) for item in graph_results[:top_k]] except Exception as e: - print(f"[HTTPBGEReranker] request failed: {e}") + logger.error(f"[HTTPBGEReranker] request failed: {e}") return [(item, 0.0) for item in graph_results[:top_k]] From cf522769be4a8010b3217b99d5dfddaec9bcef75 Mon Sep 17 00:00:00 2001 From: fridayL Date: Tue, 9 Sep 2025 17:36:33 +0800 Subject: [PATCH 02/10] update: update orginal data --- src/memos/reranker/concat.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/memos/reranker/concat.py b/src/memos/reranker/concat.py index d7f48cc5..f963098a 100644 --- a/src/memos/reranker/concat.py +++ b/src/memos/reranker/concat.py @@ -1,20 +1,24 @@ from typing import List, Dict, Any, Union, Tuple -def process_source(items: List[Tuple[Any, Union[str, Dict[str, Any], List[Any]]]] | None = None) -> str: +def process_source( + items: List[Tuple[Any, Union[str, Dict[str, Any], List[Any]]]] | None = None, + recent_num: int = 3 + ) -> str: """ Args: items: List of tuples where each tuple contains (memory, source). source can be str, Dict, or List. + recent_num: Number of recent items to concatenate. Returns: str: Concatenated source. """ concat_data = [] for item in items: memory, source = item - print(memory, source) - if isinstance(source, str): - concat_data.append(source) + for content in source[:3]: + if isinstance(content, str): + concat_data.append(content) return "\n".join(concat_data) From 362c32a97d35cd3a50e90b00a1327378a882d87e Mon Sep 17 00:00:00 2001 From: fridayL Date: Wed, 10 Sep 2025 16:00:08 +0800 Subject: [PATCH 03/10] Chore: Change version to v1.0.1 --- README.md | 1 + pyproject.toml | 2 +- src/memos/__init__.py | 2 +- src/memos/api/product_api.py | 2 +- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index d1e7bdef..6873ba2b 100644 --- a/README.md +++ b/README.md @@ -252,6 +252,7 @@ MemOS is licensed under the [Apache 2.0 License](./LICENSE). Stay up to date with the latest MemOS announcements, releases, and community highlights. +- **2025-09-10** - 🎉 *MemOS v1.0.1 (Group Q&A Bot)*: Group Q&A bot based on MemOS Cube, updated KV-Cache performance comparison data across different GPU deployment schemes, optimized test benchmarks and statistics, added plaintext memory Reranker sorting, optimized plaintext memory hallucination issues, and Playground version updates. [Try PlayGround](https://memos-playground.openmem.net/login/) - **2025-08-07** - 🎉 *MemOS v1.0.0 (MemCube Release)*: First MemCube with word game demo, LongMemEval evaluation, BochaAISearchRetriever integration, NebulaGraph support, enhanced search capabilities, and official Playground launch. - **2025-07-29** – 🎉 *MemOS v0.2.2 (Nebula Update)*: Internet search+Nebula DB integration, refactored memory scheduler, KV Cache stress tests, MemCube Cookbook release (CN/EN), and 4b/1.7b/0.6b memory ops models. - **2025-07-21** – 🎉 *MemOS v0.2.1 (Neo Release)*: Lightweight Neo version with plaintext+KV Cache functionality, Docker/multi-tenant support, MCP expansion, and new Cookbook/Mud game examples. diff --git a/pyproject.toml b/pyproject.toml index 270fd712..c66bcb05 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ ############################################################################## name = "MemoryOS" -version = "1.0.0" +version = "1.0.1" description = "Intelligence Begins with Memory" license = {text = "Apache-2.0"} readme = "README.md" diff --git a/src/memos/__init__.py b/src/memos/__init__.py index 9d1d57cc..0f6dd293 100644 --- a/src/memos/__init__.py +++ b/src/memos/__init__.py @@ -1,4 +1,4 @@ -__version__ = "1.0.0" +__version__ = "1.0.1" from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig diff --git a/src/memos/api/product_api.py b/src/memos/api/product_api.py index 06454a78..08940997 100644 --- a/src/memos/api/product_api.py +++ b/src/memos/api/product_api.py @@ -14,7 +14,7 @@ app = FastAPI( title="MemOS Product REST APIs", description="A REST API for managing multiple users with MemOS Product.", - version="1.0.0", + version="1.0.1", ) # Add request context middleware (must be added first) From a2c2a394463ac44a5c32c973c49037ef8d0ea691 Mon Sep 17 00:00:00 2001 From: fridayL Date: Wed, 10 Sep 2025 19:03:59 +0800 Subject: [PATCH 04/10] feat:fix conflict --- src/memos/reranker/concat.py | 15 +++++++++------ src/memos/reranker/http_bge.py | 8 +++++--- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/memos/reranker/concat.py b/src/memos/reranker/concat.py index f963098a..f74b3b85 100644 --- a/src/memos/reranker/concat.py +++ b/src/memos/reranker/concat.py @@ -1,10 +1,9 @@ -from typing import List, Dict, Any, Union, Tuple +from typing import Any def process_source( - items: List[Tuple[Any, Union[str, Dict[str, Any], List[Any]]]] | None = None, - recent_num: int = 3 - ) -> str: + items: list[tuple[Any, str | dict[str, Any] | list[Any]]] | None = None, recent_num: int = 3 +) -> str: """ Args: items: List of tuples where each tuple contains (memory, source). @@ -13,6 +12,8 @@ def process_source( Returns: str: Concatenated source. """ + if items is None: + items = [] concat_data = [] for item in items: memory, source = item @@ -24,7 +25,7 @@ def process_source( def concat_original_source( graph_results: list, - merge_field: List[str]=["sources"], + merge_field: list[str] | None = None, ) -> list[str]: """ Merge memory items with original dialogue. @@ -34,6 +35,8 @@ def concat_original_source( Returns: list[str]: List of memory and concat orginal memory. """ + if merge_field is None: + merge_field = ["sources"] documents = [] for item in graph_results: memory = getattr(item, "memory", "") @@ -43,4 +46,4 @@ def concat_original_source( sources.append((memory, source)) concat_string = process_source(sources) documents.append(concat_string) - return documents \ No newline at end of file + return documents diff --git a/src/memos/reranker/http_bge.py b/src/memos/reranker/http_bge.py index 7b39db88..02d2f657 100644 --- a/src/memos/reranker/http_bge.py +++ b/src/memos/reranker/http_bge.py @@ -7,9 +7,11 @@ import requests +from memos.log import get_logger + from .base import BaseReranker from .concat import concat_original_source -from memos.log import get_logger + logger = get_logger(__name__) @@ -32,7 +34,7 @@ def __init__( model: str = "bge-reranker-v2-m3", timeout: int = 10, headers_extra: dict | None = None, - concat_source: List[str]=["sources"], + concat_source: list[str] | None = None, ): if not reranker_url: raise ValueError("reranker_url must not be empty") @@ -41,7 +43,7 @@ def __init__( self.model = model self.timeout = timeout self.headers_extra = headers_extra or {} - self.concat_source = concat_source + self.concat_source = concat_source or ["sources"] def rerank( self, From b24926fd37ef8877e81460d108ca5de3f7d43ff4 Mon Sep 17 00:00:00 2001 From: fridayL Date: Wed, 10 Sep 2025 20:53:24 +0800 Subject: [PATCH 05/10] fix: update memory get --- src/memos/reranker/concat.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/memos/reranker/concat.py b/src/memos/reranker/concat.py index f74b3b85..91fccd7d 100644 --- a/src/memos/reranker/concat.py +++ b/src/memos/reranker/concat.py @@ -1,4 +1,7 @@ from typing import Any +import re + +_TAG1 = re.compile(r"^\s*\[[^\]]*\]\s*") def process_source( @@ -39,7 +42,7 @@ def concat_original_source( merge_field = ["sources"] documents = [] for item in graph_results: - memory = getattr(item, "memory", "") + memory = (_TAG1.sub("", m) if isinstance((m := getattr(item, "memory", None)), str) else m) sources = [] for field in merge_field: source = getattr(item.metadata, field, "") From a84014f46fcdcd6faaa4aef81868f3773da8f49b Mon Sep 17 00:00:00 2001 From: fridayL Date: Thu, 11 Sep 2025 10:33:35 +0800 Subject: [PATCH 06/10] fix: ci code --- src/memos/reranker/concat.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/memos/reranker/concat.py b/src/memos/reranker/concat.py index 91fccd7d..e19a35d6 100644 --- a/src/memos/reranker/concat.py +++ b/src/memos/reranker/concat.py @@ -1,6 +1,8 @@ -from typing import Any import re +from typing import Any + + _TAG1 = re.compile(r"^\s*\[[^\]]*\]\s*") @@ -42,7 +44,7 @@ def concat_original_source( merge_field = ["sources"] documents = [] for item in graph_results: - memory = (_TAG1.sub("", m) if isinstance((m := getattr(item, "memory", None)), str) else m) + memory = _TAG1.sub("", m) if isinstance((m := getattr(item, "memory", None)), str) else m sources = [] for field in merge_field: source = getattr(item.metadata, field, "") From 83d88246530c5d70e605cb35d760c5c6634e75e3 Mon Sep 17 00:00:00 2001 From: fridayL Date: Mon, 15 Sep 2025 10:14:56 +0800 Subject: [PATCH 07/10] update: search_reranker --- src/memos/reranker/concat.py | 6 +++++- src/memos/reranker/http_bge.py | 4 +++- src/memos/utils.py | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/memos/reranker/concat.py b/src/memos/reranker/concat.py index e19a35d6..d0b10d9f 100644 --- a/src/memos/reranker/concat.py +++ b/src/memos/reranker/concat.py @@ -20,11 +20,15 @@ def process_source( if items is None: items = [] concat_data = [] + memory = None for item in items: memory, source = item - for content in source[:3]: + for content in source: if isinstance(content, str): + if "assistant:" in content: + continue concat_data.append(content) + concat_data = [memory] + concat_data return "\n".join(concat_data) diff --git a/src/memos/reranker/http_bge.py b/src/memos/reranker/http_bge.py index 02d2f657..f828d25f 100644 --- a/src/memos/reranker/http_bge.py +++ b/src/memos/reranker/http_bge.py @@ -43,7 +43,9 @@ def __init__( self.model = model self.timeout = timeout self.headers_extra = headers_extra or {} - self.concat_source = concat_source or ["sources"] + # self.concat_source = concat_source or ["sources"] + self.concat_source = [] + def rerank( self, diff --git a/src/memos/utils.py b/src/memos/utils.py index 6a1d4255..a9e5fe9f 100644 --- a/src/memos/utils.py +++ b/src/memos/utils.py @@ -13,7 +13,7 @@ def wrapper(*args, **kwargs): start = time.perf_counter() result = func(*args, **kwargs) elapsed = time.perf_counter() - start - logger.info(f"[TIMER] {func.__name__} took {elapsed:.2f} s") + logger.debug(f"[TIMER] {func.__name__} took {elapsed:.2f} s") return result return wrapper From f62211453182c520fa7ab62160a7db233b94604d Mon Sep 17 00:00:00 2001 From: fridayL Date: Mon, 15 Sep 2025 14:31:23 +0800 Subject: [PATCH 08/10] change: rerank_source for reranking --- src/memos/reranker/concat.py | 3 ++- src/memos/reranker/factory.py | 1 + src/memos/reranker/http_bge.py | 8 +++----- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/memos/reranker/concat.py b/src/memos/reranker/concat.py index d0b10d9f..5ad33952 100644 --- a/src/memos/reranker/concat.py +++ b/src/memos/reranker/concat.py @@ -28,7 +28,8 @@ def process_source( if "assistant:" in content: continue concat_data.append(content) - concat_data = [memory] + concat_data + if memory is not None: + concat_data = [memory, *concat_data] return "\n".join(concat_data) diff --git a/src/memos/reranker/factory.py b/src/memos/reranker/factory.py index 244b6928..453f23c1 100644 --- a/src/memos/reranker/factory.py +++ b/src/memos/reranker/factory.py @@ -29,6 +29,7 @@ def from_config(cfg: RerankerConfigFactory | None) -> BaseReranker | None: model=c.get("model", "bge-reranker-v2-m3"), timeout=int(c.get("timeout", 10)), headers_extra=c.get("headers_extra"), + rerank_source=c.get("rerank_source"), ) if backend in {"cosine_local", "cosine"}: diff --git a/src/memos/reranker/http_bge.py b/src/memos/reranker/http_bge.py index f828d25f..c54a1ade 100644 --- a/src/memos/reranker/http_bge.py +++ b/src/memos/reranker/http_bge.py @@ -34,7 +34,7 @@ def __init__( model: str = "bge-reranker-v2-m3", timeout: int = 10, headers_extra: dict | None = None, - concat_source: list[str] | None = None, + rerank_source: list[str] | None = None, ): if not reranker_url: raise ValueError("reranker_url must not be empty") @@ -43,9 +43,7 @@ def __init__( self.model = model self.timeout = timeout self.headers_extra = headers_extra or {} - # self.concat_source = concat_source or ["sources"] - self.concat_source = [] - + self.concat_source = rerank_source def rerank( self, @@ -59,7 +57,7 @@ def rerank( documents = [] if self.concat_source: - documents = concat_original_source(graph_results) + documents = concat_original_source(graph_results, self.concat_source) else: documents = [ (_TAG1.sub("", m) if isinstance((m := getattr(item, "memory", None)), str) else m) From 991c6bf919d3cdd7ed0ac2c1608056c9a36abb36 Mon Sep 17 00:00:00 2001 From: fridayL Date: Mon, 15 Sep 2025 15:13:00 +0800 Subject: [PATCH 09/10] update config --- src/memos/api/config.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/memos/api/config.py b/src/memos/api/config.py index e7cc5d65..18327ef6 100644 --- a/src/memos/api/config.py +++ b/src/memos/api/config.py @@ -100,8 +100,10 @@ def get_reranker_config() -> dict[str, Any]: "backend": "http_bge", "config": { "url": os.getenv("MOS_RERANKER_URL"), - "model": "bge-reranker-v2-m3", + "model": os.getenv("MOS_RERANKER_MODEL", "bge-reranker-v2-m3"), "timeout": 10, + "headers_extra": os.getenv("MOS_RERANKER_HEADERS_EXTRA"), + "rerank_source": os.getenv("MOS_RERANK_SOURCE"), }, } else: From 4656d72fb6a3ca18f4a5a4dfbb809e302f2378a0 Mon Sep 17 00:00:00 2001 From: fridayL Date: Tue, 16 Sep 2025 17:04:11 +0800 Subject: [PATCH 10/10] feat: add memrerank strategy --- src/memos/reranker/concat.py | 54 ++++++++- src/memos/reranker/factory.py | 9 ++ src/memos/reranker/http_bge_strategy.py | 145 +++++++++++++++++++++++ src/memos/reranker/item.py | 147 ++++++++++++++++++++++++ 4 files changed, 352 insertions(+), 3 deletions(-) create mode 100644 src/memos/reranker/http_bge_strategy.py create mode 100644 src/memos/reranker/item.py diff --git a/src/memos/reranker/concat.py b/src/memos/reranker/concat.py index 5ad33952..43f184ed 100644 --- a/src/memos/reranker/concat.py +++ b/src/memos/reranker/concat.py @@ -1,13 +1,60 @@ import re -from typing import Any - +from typing import Any, Literal +from .item import DialogueRankingTracker _TAG1 = re.compile(r"^\s*\[[^\]]*\]\s*") +def concat_single_turn( + graph_results: list, +) -> tuple[DialogueRankingTracker, dict[str, any]]: + """ + Concatenate dialogue pairs into single strings for ranking. + + Args: + graph_results: List of graph results + + Returns: + List of concatenated dialogue pairs + + Example: + >>> sources = ["user: hello", "assistant: hi there", "user: how are you?", "assistant: I'm good"] + >>> concat_single_turn(messages) + ["user: hello\nassistant: hi there", "user: how are you?\nassistant: I'm good"] + """ + + tracker = DialogueRankingTracker() + original_items = {} + + def extract_content(msg: dict[str, Any] | str) -> str: + """Extract content from message, handling both string and dict formats.""" + if isinstance(msg, dict): + return msg.get('content', str(msg)) + return str(msg) + + for item in graph_results: + memory = _TAG1.sub("", m) if isinstance((m := getattr(item, "memory", None)), str) else m + sources = getattr(item.metadata, "sources", []) + original_items[item.id] = item + + # Group messages into pairs and concatenate + dialogue_pairs = [] + for i in range(0, len(sources), 2): + user_msg = sources[i] if i < len(sources) else "" + assistant_msg = sources[i + 1] if i + 1 < len(sources) else "" + + user_content = extract_content(user_msg) + assistant_content = extract_content(assistant_msg) + if user_content or assistant_content: # Only add non-empty pairs + pair_index = i // 2 + tracker.add_dialogue_pair(item.id, pair_index, user_msg, assistant_msg, memory) + return tracker, original_items + + def process_source( - items: list[tuple[Any, str | dict[str, Any] | list[Any]]] | None = None, recent_num: int = 3 + items: list[tuple[Any, str | dict[str, Any] | list[Any]]] | None = None, + concat_strategy: Literal["user", "assistant", "single_turn"] = "user", ) -> str: """ Args: @@ -36,6 +83,7 @@ def process_source( def concat_original_source( graph_results: list, merge_field: list[str] | None = None, + concat_strategy: Literal["user", "assistant", "single_turn"] = "user", ) -> list[str]: """ Merge memory items with original dialogue. diff --git a/src/memos/reranker/factory.py b/src/memos/reranker/factory.py index 453f23c1..c47ef158 100644 --- a/src/memos/reranker/factory.py +++ b/src/memos/reranker/factory.py @@ -6,6 +6,7 @@ from .cosine_local import CosineLocalReranker from .http_bge import HTTPBGEReranker from .noop import NoopReranker +from .http_bge_strategy import HTTPBGEWithStrategyReranker if TYPE_CHECKING: @@ -41,4 +42,12 @@ def from_config(cfg: RerankerConfigFactory | None) -> BaseReranker | None: if backend in {"noop", "none", "disabled"}: return NoopReranker() + if backend in {"http_bge_strategy", "bge_strategy"}: + return HTTPBGEWithStrategyReranker( + reranker_url=c.get("url") or c.get("endpoint") or c.get("reranker_url"), + model=c.get("model", "bge-reranker-v2-m3"), + timeout=int(c.get("timeout", 10)), + headers_extra=c.get("headers_extra") + ) + raise ValueError(f"Unknown reranker backend: {cfg.backend}") diff --git a/src/memos/reranker/http_bge_strategy.py b/src/memos/reranker/http_bge_strategy.py new file mode 100644 index 00000000..6d48c261 --- /dev/null +++ b/src/memos/reranker/http_bge_strategy.py @@ -0,0 +1,145 @@ +import re +import json +import requests +from typing import TYPE_CHECKING, Literal + +from memos.log import get_logger + +from .base import BaseReranker +from .item import DialogueRankingTracker +from .concat import concat_original_source, concat_single_turn + +logger = get_logger(__name__) + +from memos.memories.textual.item import TextualMemoryItem + +_TAG1 = re.compile(r"^\s*\[[^\]]*\]\s*") + + +class HTTPBGEWithStrategyReranker(BaseReranker): + """ + HTTP-based BGE reranker with enhanced source text processing. + Supports multiple text concatenation strategies including dialogue pairing. + """ + + def __init__( + self, + reranker_url: str, + token: str = "", + model: str = "bge-reranker-v2-m3", + timeout: int = 10, + headers_extra: dict | None = None, + rerank_source: list[str] | None = None, + concat_strategy: Literal["user", "assistant", "single_turn"] = "single_turn", + source_weight: float = 0.3, + ): + if not reranker_url: + raise ValueError("reranker_url must not be empty") + + self.reranker_url = reranker_url + self.token = token or "" + self.model = model + self.timeout = timeout + self.headers_extra = headers_extra or {} + self.concat_strategy = concat_strategy + self.source_weight = source_weight + + def _prepare_documents(self, graph_results: list) -> tuple[DialogueRankingTracker, dict[str, any], list[str]]: + """Prepare documents based on the concatenation strategy. + Args: + graph_results: List of graph results + Returns: + tuple[DialogueRankingTracker, dict[str, any], list[str]]: Tracker, original items, documents + """ + documents = [] + tracker = None + original_items = None + + if self.concat_strategy == "single_turn": + tracker, original_items = concat_single_turn(graph_results) + documents = tracker.get_documents_for_ranking() + + elif self.concat_strategy == "user": + raise NotImplementedError("User strategy is not implemented") + + elif self.concat_strategy == "assistant": + raise NotImplementedError("Assistant strategy is not implemented") + + else: + raise ValueError(f"Unknown concat_strategy: {self.concat_strategy}") + + return tracker, original_items, documents + + def rerank( + self, + query: str, + graph_results: list, + top_k: int, + **kwargs, + ) -> list[tuple[TextualMemoryItem, float]]: + if not graph_results: + return [] + + tracker, original_items, documents = self._prepare_documents(graph_results) + + logger.info( + f"[HTTPBGEWithSourceReranker] strategy: {self.concat_strategy}, " + f"query: {query}, documents count: {len(documents)}" + ) + logger.debug(f"[HTTPBGEWithSourceReranker] sample documents: {documents[:2]}...") + + if not documents: + return [] + + headers = {"Content-Type": "application/json", **self.headers_extra} + if self.token: + headers["Authorization"] = f"Bearer {self.token}" + + payload = {"model": self.model, "query": query, "documents": documents} + + try: + resp = requests.post( + self.reranker_url, headers=headers, json=payload, timeout=self.timeout + ) + resp.raise_for_status() + data = resp.json() + logger.info(f"[HTTPBGEWithStrategyReranker] response: {json.dumps(data, indent=4)}") + # Parse ranking results + ranked_indices = [] + scores = [] + + if "results" in data: + rows = data.get("results", []) + for r in rows: + idx = r.get("index") + if isinstance(idx, int) and 0 <= idx < len(documents): + score = float(r.get("relevance_score", r.get("score", 0.0))) + ranked_indices.append(idx) + scores.append(score) + + elif "data" in data: + rows = data.get("data", []) + score_list = [float(r.get("score", 0.0)) for r in rows] + + # Create ranked indices based on scores + indexed_scores = [(i, score) for i, score in enumerate(score_list)] + indexed_scores.sort(key=lambda x: x[1], reverse=True) + + ranked_indices = [idx for idx, _ in indexed_scores] + scores = [score for _, score in indexed_scores] + + else: + # Fallback: return original items with zero scores + return [(item, 0.0) for item in graph_results[:top_k]] + + # Reconstruct memory items from ranked dialogue pairs + reconstructed_items = tracker.reconstruct_memory_items( + ranked_indices, scores, original_items, top_k + ) + + logger.info(f"[HTTPBGEDialogueReranker] reconstructed {len(reconstructed_items)} memory items") + return reconstructed_items + + except Exception as e: + logger.error(f"[HTTPBGEWithSourceReranker] request failed: {e}") + return [(item, 0.0) for item in graph_results[:top_k]] \ No newline at end of file diff --git a/src/memos/reranker/item.py b/src/memos/reranker/item.py new file mode 100644 index 00000000..467b09db --- /dev/null +++ b/src/memos/reranker/item.py @@ -0,0 +1,147 @@ +from typing import Any +from pydantic import BaseModel + + +class DialoguePair(BaseModel): + """Represents a single dialogue pair extracted from sources.""" + + pair_id: str # Unique identifier for this dialogue pair + memory_id: str # ID of the source TextualMemoryItem + memory: str + pair_index: int # Index of this pair within the source memory's dialogue + user_msg: str | dict[str, Any] # User message content + assistant_msg: str | dict[str, Any] # Assistant message content + combined_text: str # The concatenated text used for ranking + + def extract_content(self, msg: str | dict[str, Any]) -> str: + """Extract content from message, handling both string and dict formats.""" + if isinstance(msg, dict): + return msg.get('content', str(msg)) + return str(msg) + + @property + def user_content(self) -> str: + """Get user message content as string.""" + return self.extract_content(self.user_msg) + + @property + def assistant_content(self) -> str: + """Get assistant message content as string.""" + return self.extract_content(self.assistant_msg) + + +class DialogueRankingTracker: + """Tracks dialogue pairs and their rankings for memory reconstruction.""" + + def __init__(self): + self.dialogue_pairs: list[DialoguePair] = [] + + def add_dialogue_pair( + self, + memory_id: str, + pair_index: int, + user_msg: str | dict[str, Any], + assistant_msg: str | dict[str, Any], + memory: str + ) -> str: + """Add a dialogue pair and return its unique ID.""" + pair_id = f"{memory_id}_{pair_index}" + + # Extract content for ranking + def extract_content(msg: str | dict[str, Any]) -> str: + if isinstance(msg, dict): + return msg.get('content', str(msg)) + return str(msg) + + user_content = extract_content(user_msg) + assistant_content = extract_content(assistant_msg) + combined_text = f"{user_content}\n{assistant_content}" + + dialogue_pair = DialoguePair( + pair_id=pair_id, + memory_id=memory_id, + pair_index=pair_index, + user_msg=user_msg, + assistant_msg=assistant_msg, + combined_text=combined_text, + memory=memory + ) + + self.dialogue_pairs.append(dialogue_pair) + + return pair_id + + def get_documents_for_ranking(self, concat_memory: bool = True) -> list[str]: + """Get the combined text documents for ranking.""" + return [(pair.memory + "\n\n" + pair.combined_text) for pair in self.dialogue_pairs] + + def get_dialogue_pair_by_index(self, index: int) -> DialoguePair | None: + """Get dialogue pair by its index in the ranking results.""" + if 0 <= index < len(self.dialogue_pairs): + return self.dialogue_pairs[index] + return None + + def reconstruct_memory_items( + self, + ranked_indices: list[int], + scores: list[float], + original_memory_items: dict[str, Any], + top_k: int + ) -> list[tuple[Any, float]]: + """ + Reconstruct TextualMemoryItem objects from ranked dialogue pairs. + + Args: + ranked_indices: List of dialogue pair indices sorted by relevance + scores: Corresponding relevance scores + original_memory_items: Dict mapping memory_id to original TextualMemoryItem + top_k: Maximum number of items to return + + Returns: + List of (reconstructed_memory_item, aggregated_score) tuples + """ + from collections import defaultdict + from copy import deepcopy + + # Group ranked pairs by memory_id + memory_groups = defaultdict(list) + memory_scores = defaultdict(list) + + for idx, score in zip(ranked_indices[:top_k * 3], scores[:top_k * 3]): # Take more pairs to ensure we have enough memories + dialogue_pair = self.get_dialogue_pair_by_index(idx) + if dialogue_pair: + memory_groups[dialogue_pair.memory_id].append(dialogue_pair) + memory_scores[dialogue_pair.memory_id].append(score) + + # Reconstruct memory items + reconstructed_items = [] + + for memory_id, pairs in memory_groups.items(): + if memory_id not in original_memory_items: + continue + + # Create a copy of the original memory item + original_item = original_memory_items[memory_id] + reconstructed_item = deepcopy(original_item) + + # Sort pairs by their original index to maintain order + pairs.sort(key=lambda p: p.pair_index) + + # Reconstruct sources from selected dialogue pairs + new_sources = [] + for pair in pairs[:1]: + new_sources.extend([pair.user_msg, pair.assistant_msg]) + + # Update the metadata sources + if hasattr(reconstructed_item.metadata, 'sources'): + reconstructed_item.metadata.sources = new_sources + + # Calculate aggregated score (e.g., max, mean, or weighted average) + pair_scores = memory_scores[memory_id] + aggregated_score = max(pair_scores) if pair_scores else 0.0 + + reconstructed_items.append((reconstructed_item, aggregated_score)) + + # Sort by aggregated score and return top_k + reconstructed_items.sort(key=lambda x: x[1], reverse=True) + return reconstructed_items[:top_k] \ No newline at end of file