From b51de02f1b73b6147991d6facd9a8011851d9de9 Mon Sep 17 00:00:00 2001 From: ZhikeLi <515839490@qq.com> Date: Thu, 19 Mar 2026 14:37:15 +0800 Subject: [PATCH] feat: enable OpenViking server and bot on research branch at 2026-0319-1437 --- openviking/storage/collection_schemas.py | 183 +++++++++++++++++- .../storage/queuefs/semantic_processor.py | 64 +++++- openviking/utils/embedding_utils.py | 83 +++++++- tests/storage/test_collection_schemas.py | 153 ++++++++++++++- tests/storage/test_embedding_utils.py | 35 ++++ tests/storage/test_semantic_processor.py | 13 ++ 6 files changed, 519 insertions(+), 12 deletions(-) create mode 100644 tests/storage/test_embedding_utils.py create mode 100644 tests/storage/test_semantic_processor.py diff --git a/openviking/storage/collection_schemas.py b/openviking/storage/collection_schemas.py index 4d110013..ee094fae 100644 --- a/openviking/storage/collection_schemas.py +++ b/openviking/storage/collection_schemas.py @@ -10,6 +10,8 @@ import asyncio import hashlib import json +import re +from functools import lru_cache from typing import Any, Dict, Optional from openviking.models.embedder.base import EmbedResult @@ -25,6 +27,108 @@ logger = get_logger(__name__) +_TOKEN_LIMIT_RE = re.compile( + r"passed\s+(?P\d+)\s+input tokens.*?maximum input length of\s+" + r"(?P\d+)\s+tokens", + re.IGNORECASE | re.DOTALL, +) +_EMBEDDING_TRUNCATION_HEADROOM = 512 + + +def _parse_input_token_limit_error(error: Exception) -> Optional[tuple[int, int]]: + """Extract input-token and max-token values from provider errors.""" + match = _TOKEN_LIMIT_RE.search(str(error)) + if not match: + return None + return int(match.group("input_tokens")), int(match.group("max_tokens")) + + +@lru_cache(maxsize=16) +def _get_token_encoder(model_name: str): + """Best-effort tokenizer lookup for provider-compatible embedding models.""" + try: + import tiktoken + except ImportError: + return None + + if model_name: + try: + return tiktoken.encoding_for_model(model_name) + except KeyError: + pass + + try: + return tiktoken.get_encoding("cl100k_base") + except Exception: + return None + + +def _truncate_text_to_token_limit( + text: str, + model_name: str, + max_tokens: int, + *, + observed_input_tokens: Optional[int] = None, +) -> str: + """Trim text to the requested token budget.""" + if max_tokens <= 0 or not text: + return text + + if observed_input_tokens and observed_input_tokens > max_tokens: + shrink_ratio = max_tokens / observed_input_tokens + target_chars = max(1, int(len(text) * shrink_ratio * 0.9)) + if target_chars < len(text): + return text[:target_chars] + + encoder = _get_token_encoder(model_name) + if encoder is not None: + token_ids = encoder.encode(text) + if len(token_ids) <= max_tokens: + if observed_input_tokens and observed_input_tokens > max_tokens: + estimated_tokens = max(1, len(token_ids)) + shrink_ratio = max_tokens / estimated_tokens + target_chars = max(1, int(len(text) * shrink_ratio * 0.9)) + return text[:target_chars] if target_chars < len(text) else text + return text + return encoder.decode(token_ids[:max_tokens]) + + estimated_tokens = max(1, len(text.encode("utf-8")) // 2) + if estimated_tokens <= max_tokens: + return text + + shrink_ratio = max_tokens / estimated_tokens + target_chars = max(1, int(len(text) * shrink_ratio * 0.9)) + return text[:target_chars] + + +def _resolve_embedder_dimension( + embedder: Any, configured_dimension: int, *, warn_prefix: str +) -> int: + """Prefer the embedder-reported dimension over config defaults.""" + if embedder and hasattr(embedder, "get_dimension"): + try: + actual_dimension = int(embedder.get_dimension()) + if actual_dimension > 0: + if configured_dimension and configured_dimension != actual_dimension: + logger.warning( + "%s embedding dimension mismatch: config=%s, embedder=%s. " + "Using embedder dimension.", + warn_prefix, + configured_dimension, + actual_dimension, + ) + return actual_dimension + except Exception as exc: + logger.warning( + "%s failed to resolve embedding dimension from embedder, " + "falling back to config=%s: %s", + warn_prefix, + configured_dimension, + exc, + ) + + return configured_dimension + class CollectionSchemas: """ @@ -114,6 +218,18 @@ async def init_context_collection(storage) -> bool: config = get_openviking_config() name = config.storage.vectordb.name vector_dim = config.embedding.dimension + try: + embedder = config.embedding.get_embedder() + vector_dim = _resolve_embedder_dimension( + embedder, vector_dim, warn_prefix="init_context_collection" + ) + except Exception as exc: + logger.warning( + "init_context_collection failed to initialize embedder for dimension " + "detection, using config dimension=%s: %s", + vector_dim, + exc, + ) schema = CollectionSchemas.context_collection(name, vector_dim) return await storage.create_collection(name, schema) @@ -148,6 +264,68 @@ def __init__(self, vikingdb: VikingVectorIndexBackend): def _initialize_embedder(self, config: "OpenVikingConfig"): """Initialize the embedder instance from config.""" self._embedder = config.embedding.get_embedder() + self._vector_dim = _resolve_embedder_dimension( + self._embedder, self._vector_dim, warn_prefix="TextEmbeddingHandler" + ) + + def _embed_with_retry( + self, + text: str, + uri: str = "", + fallback_text: str = "", + ) -> EmbedResult: + """Retry with progressively smaller text when the provider rejects overlong input.""" + current_text = text + model_name = getattr(self._embedder, "model_name", "") + last_error: Optional[Exception] = None + + for attempt in range(5): + try: + return self._embedder.embed(current_text) + except Exception as exc: + last_error = exc + limit_info = _parse_input_token_limit_error(exc) + if not limit_info: + raise + + input_tokens, max_tokens = limit_info + retry_budget = max( + 1, + int((max_tokens - _EMBEDDING_TRUNCATION_HEADROOM) * (0.85**attempt)), + ) + truncated_text = _truncate_text_to_token_limit( + current_text, + model_name, + retry_budget, + observed_input_tokens=input_tokens, + ) + if len(truncated_text) >= len(current_text): + fallback_chars = max(1, int(len(current_text) * 0.5)) + truncated_text = current_text[:fallback_chars] + if len(truncated_text) >= len(current_text): + raise exc + + logger.warning( + "Embedding input too long for uri=%s model=%s (%s > %s tokens). " + "Attempt %s/5: truncating to ~%s tokens and retrying.", + uri or "", + model_name or "", + input_tokens, + max_tokens, + attempt + 1, + retry_budget, + ) + current_text = truncated_text + + if fallback_text and fallback_text.strip() and fallback_text.strip() != text.strip(): + logger.warning( + "Embedding retries exhausted for uri=%s model=%s. Falling back to abstract/summary text.", + uri or "", + model_name or "", + ) + return self._embedder.embed(fallback_text) + + raise RuntimeError("Failed to embed text after token-limit retries") from last_error @staticmethod def _seed_uri_for_id(uri: str, level: Any) -> str: @@ -198,7 +376,10 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, # embed() is a blocking HTTP call; offload to thread pool to avoid # blocking the event loop and allow real concurrency. result: EmbedResult = await asyncio.to_thread( - self._embedder.embed, embedding_msg.message + self._embed_with_retry, + embedding_msg.message, + inserted_data.get("uri", ""), + inserted_data.get("abstract", ""), ) # Add dense vector diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 59700783..cb8c48d4 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -3,6 +3,7 @@ """SemanticProcessor: Processes messages from SemanticQueue, generates .abstract.md and .overview.md.""" import asyncio +from functools import lru_cache from typing import Any, Dict, List, Optional, Tuple from openviking.parse.parsers.constants import ( @@ -31,6 +32,52 @@ logger = get_logger(__name__) +_SEMANTIC_CONTENT_TOKEN_BUDGET = 6000 + + +@lru_cache(maxsize=16) +def _get_token_encoder(model_name: str): + """Best-effort tokenizer lookup for LLM prompt budgeting.""" + try: + import tiktoken + except ImportError: + return None + + if model_name: + try: + return tiktoken.encoding_for_model(model_name) + except KeyError: + pass + + try: + return tiktoken.get_encoding("cl100k_base") + except Exception: + return None + + +def _truncate_text_by_token_budget(text: str, model_name: str, max_tokens: int) -> str: + """Trim file content before rendering it into a summary prompt.""" + if max_tokens <= 0 or not text: + return text + + encoder = _get_token_encoder(model_name) + suffix = "\n...(truncated)" + if encoder is not None: + token_ids = encoder.encode(text) + if len(token_ids) <= max_tokens: + return text + suffix_tokens = encoder.encode(suffix) + budget = max(1, max_tokens - len(suffix_tokens)) + return encoder.decode(token_ids[:budget]) + suffix + + estimated_tokens = max(1, len(text.encode("utf-8")) // 2) + if estimated_tokens <= max_tokens: + return text + + shrink_ratio = max_tokens / estimated_tokens + target_chars = max(1, int(len(text) * shrink_ratio * 0.9)) + return text[:target_chars] + suffix + class SemanticProcessor(DequeueHandlerBase): """ @@ -321,10 +368,6 @@ async def _generate_text_summary( # Read file content (limit length) content = await viking_fs.read_file(file_path, ctx=active_ctx) - - # Limit content length (about 10000 tokens) - max_chars = 30000 - content = await viking_fs.read_file(file_path, ctx=active_ctx) if isinstance(content, bytes): # Try to decode with error handling for text files try: @@ -333,10 +376,15 @@ async def _generate_text_summary( logger.warning(f"Failed to decode file as UTF-8, skipping: {file_path}") return {"name": file_name, "summary": ""} - # Limit content length (about 10000 tokens) - max_chars = 30000 - if len(content) > max_chars: - content = content[:max_chars] + "\n...(truncated)" + model_name = getattr(vlm.get_vlm_instance(), "model_name", "") if vlm.is_available() else "" + truncated_content = _truncate_text_by_token_budget( + content, model_name, _SEMANTIC_CONTENT_TOKEN_BUDGET + ) + if truncated_content != content: + logger.debug( + "Truncated file content for summary generation: %s", file_path + ) + content = truncated_content # Generate summary if not vlm.is_available(): diff --git a/openviking/utils/embedding_utils.py b/openviking/utils/embedding_utils.py index 1dffc1c8..b1aebd09 100644 --- a/openviking/utils/embedding_utils.py +++ b/openviking/utils/embedding_utils.py @@ -19,6 +19,26 @@ logger = get_logger(__name__) +_PREFER_SUMMARY_FILENAMES = { + "cargo.lock", + "uv.lock", + "poetry.lock", + "pdm.lock", + "yarn.lock", + "package-lock.json", + "composer.lock", + "gemfile.lock", + "go.sum", + "pnpm-lock.yaml", + "pnpm-lock.yml", + "bun.lockb", +} +_PREFER_SUMMARY_EXTENSIONS = { + ".lock", + ".sum", +} +_DIRECT_TEXT_VECTORIZE_CHAR_BUDGET = 12000 + def _owner_space_for_uri(uri: str, ctx: RequestContext) -> str: """Derive owner_space from a URI.""" @@ -35,13 +55,30 @@ def get_resource_content_type(file_name: str) -> Optional[ResourceContentType]: Returns None if the file type is not recognized. """ file_name = file_name.lower() + text_filenames = { + "makefile", + "dockerfile", + "containerfile", + "gemfile", + "procfile", + "license", + "copying", + "notice", + "authors", + "changelog", + "readme", + } text_extensions = { ".txt", ".md", + ".markdown", ".csv", + ".tsv", ".json", ".xml", + ".html", + ".htm", ".py", ".js", ".ts", @@ -65,12 +102,19 @@ def get_resource_content_type(file_name: str) -> Optional[ResourceContentType]: ".r", ".m", ".pl", + ".as", ".toml", ".yaml", ".yml", ".ini", ".cfg", ".conf", + ".css", + ".scss", + ".sass", + ".less", + ".example", + ".in", ".tsx", ".jsx", ".cs", @@ -80,6 +124,12 @@ def get_resource_content_type(file_name: str) -> Optional[ResourceContentType]: ".tf", ".proto", ".gradle", + ".cmake", + ".mk", + ".make", + ".mod", + ".sum", + ".lock", ".cc", ".cxx", ".hpp", @@ -98,7 +148,9 @@ def get_resource_content_type(file_name: str) -> Optional[ResourceContentType]: video_extensions = {".mp4", ".avi", ".mov", ".wmv", ".flv"} audio_extensions = {".mp3", ".wav", ".aac", ".flac"} - if any(file_name.endswith(ext) for ext in text_extensions): + if file_name in text_filenames or any(file_name.endswith(ext) for ext in text_extensions): + return ResourceContentType.TEXT + if file_name.endswith(".conf.example") or file_name.endswith(".ovcli.conf.example"): return ResourceContentType.TEXT elif any(file_name.endswith(ext) for ext in image_extensions): return ResourceContentType.IMAGE @@ -110,6 +162,19 @@ def get_resource_content_type(file_name: str) -> Optional[ResourceContentType]: return None +def _should_prefer_summary_for_vectorization(file_name: str) -> bool: + file_name = file_name.lower() + if file_name in _PREFER_SUMMARY_FILENAMES: + return True + return any(file_name.endswith(ext) for ext in _PREFER_SUMMARY_EXTENSIONS) + + +def _truncate_text_for_embedding(text: str, max_chars: int = _DIRECT_TEXT_VECTORIZE_CHAR_BUDGET) -> str: + if len(text) <= max_chars: + return text + return text[:max_chars] + "\n...(truncated)" + + async def vectorize_directory_meta( uri: str, abstract: str, @@ -220,12 +285,26 @@ async def vectorize_file( ) return elif content_type == ResourceContentType.TEXT: + prefer_summary = _should_prefer_summary_for_vectorization(file_name) + if prefer_summary and summary: + context.set_vectorize(Vectorize(text=summary)) + logger.debug( + "Using semantic summary instead of raw content for vectorization: %s", + file_path, + ) + embedding_msg = EmbeddingMsgConverter.from_context(context) + if not embedding_msg: + return + await embedding_queue.enqueue(embedding_msg) + logger.debug(f"Enqueued file for vectorization: {file_path}") + return + # For text files, try to read content try: content = await viking_fs.read_file(file_path, ctx=ctx) if isinstance(content, bytes): content = content.decode("utf-8", errors="replace") - context.set_vectorize(Vectorize(text=content)) + context.set_vectorize(Vectorize(text=_truncate_text_for_embedding(content))) except Exception as e: logger.warning( f"Failed to read file content for {file_path}, falling back to summary: {e}" diff --git a/tests/storage/test_collection_schemas.py b/tests/storage/test_collection_schemas.py index c25dbbda..318806e8 100644 --- a/tests/storage/test_collection_schemas.py +++ b/tests/storage/test_collection_schemas.py @@ -7,18 +7,22 @@ import pytest from openviking.models.embedder.base import EmbedResult -from openviking.storage.collection_schemas import TextEmbeddingHandler +from openviking.storage.collection_schemas import TextEmbeddingHandler, _truncate_text_to_token_limit from openviking.storage.queuefs.embedding_msg import EmbeddingMsg class _DummyEmbedder: def __init__(self): self.calls = 0 + self.model_name = "test-embedding-model" def embed(self, text: str) -> EmbedResult: self.calls += 1 return EmbedResult(dense_vector=[0.1, 0.2]) + def get_dimension(self) -> int: + return 2 + class _DummyConfig: def __init__(self, embedder: _DummyEmbedder): @@ -42,6 +46,17 @@ def _build_queue_payload() -> dict: return {"data": json.dumps(msg.to_dict())} +class _RecordingVikingDB: + def __init__(self): + self.is_closing = False + self.payloads = [] + self.has_queue_manager = False + + async def upsert(self, data, *, ctx): + self.payloads.append(data) + return data["id"] + + @pytest.mark.asyncio async def test_embedding_handler_skip_all_work_when_manager_is_closing(monkeypatch): class _ClosingVikingDB: @@ -104,3 +119,139 @@ async def upsert(self, _data, *, ctx): assert embedder.calls == 1 assert status["success"] == 1 assert status["error"] == 0 + + +@pytest.mark.asyncio +async def test_embedding_handler_uses_embedder_dimension_when_config_dimension_is_stale( + monkeypatch, +): + class _MismatchedConfig: + def __init__(self, embedder): + self.storage = SimpleNamespace(vectordb=SimpleNamespace(name="context")) + self.embedding = SimpleNamespace( + dimension=2048, + get_embedder=lambda: embedder, + ) + + embedder = _DummyEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _MismatchedConfig(embedder), + ) + + vikingdb = _RecordingVikingDB() + handler = TextEmbeddingHandler(vikingdb) + + result = await handler.on_dequeue(_build_queue_payload()) + + assert result is not None + assert result["vector"] == [0.1, 0.2] + assert vikingdb.payloads[0]["vector"] == [0.1, 0.2] + + +@pytest.mark.asyncio +async def test_embedding_handler_retries_with_truncated_text_after_token_limit_error(monkeypatch): + token_limit_error = ( + "OpenAI API error: Error code: 400 - {'error': {'message': " + "\"You passed 16385 input tokens and requested 0 output tokens. " + "However, the model's context length is only 16384 tokens, resulting in " + "a maximum input length of 16384 tokens. Please reduce the length of the " + "input prompt. (parameter=input_tokens, value=16385)\", 'type': " + "'BadRequestError', 'param': None, 'code': 400}}" + ) + + class _RetryingEmbedder: + def __init__(self): + self.calls = [] + self.model_name = "text-embedding-3-large" + + def embed(self, text: str) -> EmbedResult: + self.calls.append(text) + if len(self.calls) == 1: + raise RuntimeError(token_limit_error) + return EmbedResult(dense_vector=[0.1, 0.2]) + + def get_dimension(self) -> int: + return 2 + + embedder = _RetryingEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _DummyConfig(embedder), + ) + + msg = EmbeddingMsg( + message="token " * 20000, + context_data={ + "id": "id-2", + "uri": "viking://resources/large-file", + "account_id": "default", + "abstract": "sample", + }, + ) + + handler = TextEmbeddingHandler(_RecordingVikingDB()) + result = await handler.on_dequeue({"data": json.dumps(msg.to_dict())}) + + assert result is not None + assert len(embedder.calls) == 2 + assert len(embedder.calls[1]) < len(embedder.calls[0]) + + +@pytest.mark.asyncio +async def test_embedding_handler_falls_back_to_summary_after_retry_exhaustion(monkeypatch): + token_limit_error = ( + "OpenAI API error: Error code: 400 - {'error': {'message': " + "\"You passed 32769 input tokens and requested 0 output tokens. " + "However, the model's context length is only 32768 tokens, resulting in " + "a maximum input length of 32768 tokens. Please reduce the length of the " + "input prompt. (parameter=input_tokens, value=32769)\", 'type': " + "'BadRequestError', 'param': None, 'code': 400}}" + ) + + class _AlwaysTooLongEmbedder: + def __init__(self): + self.calls = [] + self.model_name = "Qwen3-Embedding-0.6B" + + def embed(self, text: str) -> EmbedResult: + self.calls.append(text) + if text != "short summary": + raise RuntimeError(token_limit_error) + return EmbedResult(dense_vector=[0.1, 0.2]) + + def get_dimension(self) -> int: + return 2 + + embedder = _AlwaysTooLongEmbedder() + monkeypatch.setattr( + "openviking_cli.utils.config.get_openviking_config", + lambda: _DummyConfig(embedder), + ) + + msg = EmbeddingMsg( + message="very long content " * 5000, + context_data={ + "id": "id-3", + "uri": "viking://resources/large-lockfile", + "account_id": "default", + "abstract": "short summary", + }, + ) + + handler = TextEmbeddingHandler(_RecordingVikingDB()) + result = await handler.on_dequeue({"data": json.dumps(msg.to_dict())}) + + assert result is not None + assert embedder.calls[-1] == "short summary" +def test_truncate_text_to_token_limit_uses_provider_observed_tokens_when_tokenizer_disagrees(): + text = "x" * 1000 + + truncated = _truncate_text_to_token_limit( + text, + "Qwen3-Embedding-0.6B", + 900, + observed_input_tokens=1800, + ) + + assert len(truncated) < len(text) diff --git a/tests/storage/test_embedding_utils.py b/tests/storage/test_embedding_utils.py new file mode 100644 index 00000000..02e3d71b --- /dev/null +++ b/tests/storage/test_embedding_utils.py @@ -0,0 +1,35 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +from openviking.core.context import ResourceContentType +from openviking.utils.embedding_utils import ( + _should_prefer_summary_for_vectorization, + _truncate_text_for_embedding, + get_resource_content_type, +) + + +def test_get_resource_content_type_supports_common_text_build_files(): + assert get_resource_content_type("App.css") == ResourceContentType.TEXT + assert get_resource_content_type("utils.cmake") == ResourceContentType.TEXT + assert get_resource_content_type("go.mod") == ResourceContentType.TEXT + assert get_resource_content_type("uv.lock") == ResourceContentType.TEXT + assert get_resource_content_type("Makefile") == ResourceContentType.TEXT + assert get_resource_content_type("LICENSE") == ResourceContentType.TEXT + assert get_resource_content_type("MANIFEST.in") == ResourceContentType.TEXT + assert get_resource_content_type("ov.conf.example") == ResourceContentType.TEXT + + +def test_generated_lockfiles_prefer_summary_for_vectorization(): + assert _should_prefer_summary_for_vectorization("Cargo.lock") is True + assert _should_prefer_summary_for_vectorization("uv.lock") is True + assert _should_prefer_summary_for_vectorization("main.py") is False + + +def test_truncate_text_for_embedding_caps_raw_content_length(): + text = "x" * 20000 + + truncated = _truncate_text_for_embedding(text, max_chars=100) + + assert len(truncated) < len(text) + assert truncated.endswith("...(truncated)") diff --git a/tests/storage/test_semantic_processor.py b/tests/storage/test_semantic_processor.py new file mode 100644 index 00000000..6663162a --- /dev/null +++ b/tests/storage/test_semantic_processor.py @@ -0,0 +1,13 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +from openviking.storage.queuefs.semantic_processor import _truncate_text_by_token_budget + + +def test_truncate_text_by_token_budget_reduces_long_prompt_content(): + text = "token " * 20000 + + truncated = _truncate_text_by_token_budget(text, "unknown-model", 100) + + assert len(truncated) < len(text) + assert truncated.endswith("...(truncated)")