From cd3d5a1c2caa39d908a57cd825768bbb7dc04264 Mon Sep 17 00:00:00 2001 From: Reese Date: Sun, 30 Nov 2025 14:23:48 +0000 Subject: [PATCH 01/18] Improve repo_search ranking with explicit code-first/docs-first modes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add an explicit mode knob to the repo_search MCP tool (code_first, balanced, docs_first) - Plumb mode through repo_search → run_hybrid_search for in-process hybrid search calls - Make hybrid_search implementation/doc weighting mode-aware: - Default/code_first: full IMPLEMENTATION_BOOST and DOCUMENTATION_PENALTY - balanced: keep impl boost, halve structural doc penalty - docs_first: reduce impl boost and disable structural doc penalty - Keep documentation penalties purely structural (README/docs/.md/etc) instead of query-phrase based - Add MCP-side mode-aware reordering in repo_search: - Group core implementation code, other code, and docs differently for code_first vs docs_first - Implement a code_first post-processing shim to ensure at least N core code hits in the top-K - Tunable via REPO_SEARCH_CODE_FIRST_MIN_CORE and REPO_SEARCH_CODE_FIRST_TOP_K - Thread the mode argument through repo_search_compat so clients can select modes via the compat wrapper --- scripts/hybrid_search.py | 44 ++++++++---- scripts/mcp_indexer_server.py | 130 ++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+), 13 deletions(-) diff --git a/scripts/hybrid_search.py b/scripts/hybrid_search.py index 21046ac5..2c0d2269 100644 --- a/scripts/hybrid_search.py +++ b/scripts/hybrid_search.py @@ -1351,7 +1351,7 @@ def dense_query( # In-process API: run hybrid search and return structured items list # Optional: pass an existing TextEmbedding instance via model to reuse cache - +# Optional: pass mode to adjust implementation/docs weighting (code_first/balanced/docs_first) def run_hybrid_search( queries: List[str], @@ -1370,6 +1370,7 @@ def run_hybrid_search( expand: bool = True, model: TextEmbedding | None = None, collection: str | None = None, + mode: str | None = None, ) -> List[Dict[str, Any]]: client = QdrantClient(url=os.environ.get("QDRANT_URL", QDRANT_URL), api_key=API_KEY) model_name = os.environ.get("EMBEDDING_MODEL", MODEL_NAME) @@ -1474,6 +1475,7 @@ def _norm_under(u: str | None) -> str | None: str(_collection()), _env_truthy(os.environ.get("HYBRID_ADAPTIVE_WEIGHTS"), True), _env_truthy(os.environ.get("HYBRID_MMR"), True), + str(mode or ""), ) except Exception: cache_key = None @@ -1555,6 +1557,7 @@ def _norm_under(u: str | None) -> str | None: 'expand': expand, 'collection': _collection(), 'vector_name': vec_name, + 'mode': mode, } is_duplicate, similar_fp = is_duplicate_request(request_data) if is_duplicate: @@ -2084,6 +2087,18 @@ def _bn(p: str) -> str: # Lexical + boosts timestamps: List[int] = [] + # Mode-aware tweaks for implementation/docs weighting. Modes: + # - None / "code_first": full IMPLEMENTATION_BOOST and DOCUMENTATION_PENALTY + # - "balanced": keep impl boost, halve doc penalty + # - "docs_first": reduce impl boost slightly and disable doc penalty + eff_mode = (mode or "").strip().lower() + impl_boost = IMPLEMENTATION_BOOST + doc_penalty = DOCUMENTATION_PENALTY + if eff_mode in {"balanced"}: + doc_penalty = DOCUMENTATION_PENALTY * 0.5 + elif eff_mode in {"docs_first", "docs-first", "docs"}: + impl_boost = IMPLEMENTATION_BOOST * 0.5 + doc_penalty = 0.0 for pid, rec in list(score_map.items()): payload = rec["pt"].payload or {} base_md = payload.get("metadata") or {} @@ -2133,22 +2148,25 @@ def _bn(p: str) -> str: if ext in {".json", ".yml", ".yaml", ".toml", ".ini"} or "/.codebase/" in path_lower or "/.kiro/" in path_lower: rec["cfg"] = float(rec.get("cfg", 0.0)) - CONFIG_FILE_PENALTY rec["s"] -= CONFIG_FILE_PENALTY - # Boost likely implementation files - if IMPLEMENTATION_BOOST > 0.0 and path: + # Boost likely implementation files (mode-aware) + if impl_boost > 0.0 and path: if ext in {".py", ".ts", ".tsx", ".js", ".jsx", ".go", ".java", ".rs", ".rb", ".php", ".cs", ".cpp", ".c", ".hpp", ".h"}: - rec["impl"] = float(rec.get("impl", 0.0)) + IMPLEMENTATION_BOOST - rec["s"] += IMPLEMENTATION_BOOST - # Penalize docs for implementation-style questions - qlow = " ".join(qlist).lower() - if DOCUMENTATION_PENALTY > 0.0 and path: - if ("readme" in path_lower or "/docs/" in path_lower or "/documentation/" in path_lower or path_lower.endswith(".md")): - if any(w in qlow for w in ["how does", "explain", "works", "algorithm"]): - rec["doc"] = float(rec.get("doc", 0.0)) - DOCUMENTATION_PENALTY - rec["s"] -= DOCUMENTATION_PENALTY + rec["impl"] = float(rec.get("impl", 0.0)) + impl_boost + rec["s"] += impl_boost + # Penalize docs (README/docs/markdown) relative to implementation files (mode-aware) + if doc_penalty > 0.0 and path: + if ( + "readme" in path_lower + or "/docs/" in path_lower + or "/documentation/" in path_lower + or path_lower.endswith(".md") + ): + rec["doc"] = float(rec.get("doc", 0.0)) - doc_penalty + rec["s"] -= doc_penalty if LANG_MATCH_BOOST > 0.0 and path and eff_language: lang = str(eff_language).lower() - md_lang = str((md.get("language") or "")).lower() + md_lang = str((md.get("language") or "").lower()) if (lang and md_lang and md_lang == lang) or lang_matches_path(lang, path): rec["langb"] += LANG_MATCH_BOOST rec["s"] += LANG_MATCH_BOOST diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 8e20a7b0..1b4f3606 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -1676,6 +1676,7 @@ async def repo_search( highlight_snippet: Any = None, collection: Any = None, workspace_path: Any = None, + mode: Any = None, session: Any = None, @@ -1841,6 +1842,11 @@ async def repo_search( case = _extra.get("case") if compact in (None, "") and _extra.get("compact") is not None: compact = _extra.get("compact") + # Optional mode hint: "code_first", "docs_first", "balanced" + if ( + mode is None or (isinstance(mode, str) and str(mode).strip() == "") + ) and _extra.get("mode") is not None: + mode = _extra.get("mode") except Exception: pass @@ -1895,6 +1901,9 @@ def _to_str(x, default=""): ) highlight_snippet = _to_bool(highlight_snippet, True) + # Optional mode knob: "code_first" (default for IDE), "docs_first", "balanced" + mode_str = _to_str(mode, "").strip().lower() + # Resolve collection precedence: explicit > per-connection defaults > token defaults > env default coll_hint = _to_str(collection, "").strip() @@ -2039,6 +2048,7 @@ def _to_str_list(x): expand=str(os.environ.get("HYBRID_EXPAND", "1")).strip().lower() in {"1", "true", "yes", "on"}, model=model, + mode=mode_str or None, ) finally: if prev_coll is None: @@ -2128,6 +2138,7 @@ def _to_str_list(x): expand=str(os.environ.get("HYBRID_EXPAND", "0")).strip().lower() in {"1", "true", "yes", "on"}, model=model, + mode=mode_str or None, ) json_lines = items except Exception: @@ -2360,6 +2371,124 @@ def _doc_for(obj: dict) -> str: item["budget_tokens_used"] = int(obj.get("budget_tokens_used")) results.append(item) + # Mode-aware reordering: nudge core implementation code vs docs and non-core when requested + def _is_doc_path(p: str) -> bool: + pl = str(p or "").lower() + return ( + "readme" in pl + or "/docs/" in pl + or "/documentation/" in pl + or pl.endswith(".md") + or pl.endswith(".rst") + or pl.endswith(".txt") + ) + + def _is_core_code_item(item: dict) -> bool: + try: + p = str(item.get("path") or "").lower() + except Exception: + return False + if not p: + return False + if _is_doc_path(p): + return False + # Extension-based implementation heuristic (mirrors hybrid_search IMPLEMENTATION_BOOST set) + ext = "." + p.rsplit(".", 1)[-1] if "." in p else "" + if ext not in { + ".py", + ".ts", + ".tsx", + ".js", + ".jsx", + ".go", + ".java", + ".rs", + ".rb", + ".php", + ".cs", + ".cpp", + ".c", + ".hpp", + ".h", + }: + return False + # Basic path-based exclusions for non-core code + if ( + "/test" in p + or "/tests/" in p + or "/__tests__/" in p + or p.endswith("_test.py") + or p.endswith("test.py") + or "/fixtures/" in p + or "/vendor/" in p + or "/third_party/" in p + or "/.codebase/" in p + or "/.kiro/" in p + ): + return False + # Prefer items that were not explicitly tagged as docs/config/tests in hybrid components + comps = item.get("components") or {} + try: + if comps: + if comps.get("config_penalty") or comps.get("test_penalty") or comps.get("doc_penalty"): + return False + except Exception: + pass + return True + + if mode_str in {"code_first", "code-first", "code"}: + core_items: list[dict] = [] + other_code: list[dict] = [] + doc_items: list[dict] = [] + for it in results: + p = it.get("path") or "" + if p and _is_doc_path(p): + doc_items.append(it) + elif _is_core_code_item(it): + core_items.append(it) + else: + other_code.append(it) + results = core_items + other_code + doc_items + + try: + _min_core = int(os.environ.get("REPO_SEARCH_CODE_FIRST_MIN_CORE", "2") or 0) + except Exception: + _min_core = 2 + try: + _top_k = int(os.environ.get("REPO_SEARCH_CODE_FIRST_TOP_K", "8") or 8) + except Exception: + _top_k = 8 + if _min_core > 0 and results: + top_k = max(0, min(_top_k, len(results))) + if top_k > 0: + flags = [_is_core_code_item(it) for it in results] + cur_core = sum(1 for i in range(top_k) if flags[i]) + if cur_core < _min_core: + for src in range(top_k, len(results)): + if not flags[src]: + continue + for dst in range(top_k - 1, -1, -1): + if not flags[dst]: + results[dst], results[src] = results[src], results[dst] + flags[dst], flags[src] = flags[src], flags[dst] + cur_core += 1 + break + if cur_core >= _min_core: + break + elif mode_str in {"docs_first", "docs-first", "docs"}: + core_items = [] + other_code = [] + doc_items = [] + for it in results: + p = it.get("path") or "" + if p and _is_doc_path(p): + doc_items.append(it) + elif _is_core_code_item(it): + core_items.append(it) + else: + other_code.append(it) + results = doc_items + core_items + other_code + # Optionally add snippets (with highlighting) toks = _tokens_from_queries(queries) if include_snippet: @@ -2535,6 +2664,7 @@ async def repo_search_compat(**arguments) -> Dict[str, Any]: "not_": not_value, "case": args.get("case"), "compact": args.get("compact"), + "mode": args.get("mode"), # Alias passthroughs captured by repo_search(**kwargs) "queries": queries, "q": args.get("q"), From eb2bfcf634a40a92bc73b796bc785b878735c158 Mon Sep 17 00:00:00 2001 From: Reese Date: Mon, 1 Dec 2025 00:12:54 +0000 Subject: [PATCH 02/18] Improves hybrid search limit handling Ensures the hybrid search uses the greater value between the originally requested limit and the rerank_top_n value when reranking is enabled. Also enforces the user-requested limit on the final result set. --- scripts/mcp_indexer_server.py | 37 +++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 1b4f3606..9a5e4a18 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -2024,12 +2024,25 @@ def _to_str_list(x): model = _get_embedding_model(model_name) # Ensure hybrid_search uses the intended collection when running in-process prev_coll = os.environ.get("COLLECTION_NAME") + # Determine effective hybrid candidate limit: if rerank is enabled, search up to rerank_top_n + try: + base_limit = int(limit) + except Exception: + base_limit = 10 + eff_limit = base_limit + if rerank_enabled: + try: + rt = int(rerank_top_n) + except Exception: + rt = 0 + if rt > eff_limit: + eff_limit = rt try: os.environ["COLLECTION_NAME"] = collection # In-process path_glob/not_glob accept a single string; reduce list inputs safely items = run_hybrid_search( queries=queries, - limit=int(limit), + limit=eff_limit, per_path=( int(per_path) if (per_path is not None and str(per_path).strip() != "") @@ -2066,11 +2079,23 @@ def _to_str_list(x): if not use_hybrid_inproc: # Try hybrid search via subprocess (JSONL output) + try: + base_limit = int(limit) + except Exception: + base_limit = 10 + eff_limit = base_limit + if rerank_enabled: + try: + rt = int(rerank_top_n) + except Exception: + rt = 0 + if rt > eff_limit: + eff_limit = rt cmd = [ "python", _work_script("hybrid_search.py"), "--limit", - str(int(limit)), + str(eff_limit), "--json", ] if per_path is not None and str(per_path).strip() != "": @@ -2489,6 +2514,14 @@ def _is_core_code_item(item: dict) -> bool: other_code.append(it) results = doc_items + core_items + other_code + # Enforce user-requested limit on final result count + try: + _limit_n = int(limit) + except Exception: + _limit_n = 0 + if _limit_n > 0 and len(results) > _limit_n: + results = results[:_limit_n] + # Optionally add snippets (with highlighting) toks = _tokens_from_queries(queries) if include_snippet: From a717fff712cf6cbba6ac1922a3fea8b6bbf803b5 Mon Sep 17 00:00:00 2001 From: Reese Date: Mon, 1 Dec 2025 00:41:44 +0000 Subject: [PATCH 03/18] Improves core code classification Refactors the core code classification logic for more accurate identification, re-using hybrid_search's helpers when available. This change avoids duplicating extension and path-based heuristics and allows for better mode-aware reordering of search results. --- scripts/mcp_indexer_server.py | 81 ++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 9a5e4a18..b4d49cff 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -2409,48 +2409,24 @@ def _is_doc_path(p: str) -> bool: ) def _is_core_code_item(item: dict) -> bool: + """Classify a result as core implementation code for mode-aware reordering. + + This intentionally reuses hybrid_search's notion of core/test/vendor files + instead of duplicating extension and path heuristics here. We only apply + lightweight checks on top (docs/config/tests components) and delegate the + rest to helpers from hybrid_search when available. + """ try: - p = str(item.get("path") or "").lower() + raw_path = item.get("path") or "" + p = str(raw_path) except Exception: return False if not p: return False + # Never treat docs as core code if _is_doc_path(p): return False - # Extension-based implementation heuristic (mirrors hybrid_search IMPLEMENTATION_BOOST set) - ext = "." + p.rsplit(".", 1)[-1] if "." in p else "" - if ext not in { - ".py", - ".ts", - ".tsx", - ".js", - ".jsx", - ".go", - ".java", - ".rs", - ".rb", - ".php", - ".cs", - ".cpp", - ".c", - ".hpp", - ".h", - }: - return False - # Basic path-based exclusions for non-core code - if ( - "/test" in p - or "/tests/" in p - or "/__tests__/" in p - or p.endswith("_test.py") - or p.endswith("test.py") - or "/fixtures/" in p - or "/vendor/" in p - or "/third_party/" in p - or "/.codebase/" in p - or "/.kiro/" in p - ): - return False + # Prefer items that were not explicitly tagged as docs/config/tests in hybrid components comps = item.get("components") or {} try: @@ -2459,6 +2435,41 @@ def _is_core_code_item(item: dict) -> bool: return False except Exception: pass + + # Defer to hybrid_search helpers when available to avoid duplicating + # extension and path-based logic. + try: + from scripts.hybrid_search import ( # type: ignore + is_core_file as _hy_core_file, + is_test_file as _hy_is_test_file, + is_vendor_path as _hy_is_vendor_path, + ) + except Exception: + _hy_core_file = None + _hy_is_test_file = None + _hy_is_vendor_path = None + + if _hy_core_file: + try: + if not _hy_core_file(p): + return False + except Exception: + return False + if _hy_is_test_file: + try: + if _hy_is_test_file(p): + return False + except Exception: + pass + if _hy_is_vendor_path: + try: + if _hy_is_vendor_path(p): + return False + except Exception: + pass + + # If helper imports failed, fall back to a permissive classification: + # treat the item as core code (we already filtered obvious docs/config/tests). return True if mode_str in {"code_first", "code-first", "code"}: From 58be093cf89161270bbbfc65caf62553085966d1 Mon Sep 17 00:00:00 2001 From: Reese Date: Mon, 1 Dec 2025 18:02:31 +0000 Subject: [PATCH 04/18] Preserves pseudo/tags in hybrid search results Ensures that pseudo and tag metadata from index time are carried through in hybrid search results. This allows downstream consumers, such as repo search rerankers, to incorporate index-time GLM/LLM labels into their scoring or display logic. It enriches candidate documents with pseudo/tags information when available, improving reranking and search result context. --- scripts/hybrid_search.py | 10 +++++++ scripts/mcp_indexer_server.py | 53 ++++++++++++++++++++++++++++++----- 2 files changed, 56 insertions(+), 7 deletions(-) diff --git a/scripts/hybrid_search.py b/scripts/hybrid_search.py index 2c0d2269..95fb049e 100644 --- a/scripts/hybrid_search.py +++ b/scripts/hybrid_search.py @@ -2631,6 +2631,14 @@ def _resolve(seg: str) -> list[str]: _metadata.get("text") or "" ) + # Carry through pseudo/tags so downstream consumers (e.g., repo_search reranker) + # can incorporate index-time GLM/llm labels into their own scoring or display. + _pseudo = _payload.get("pseudo") + if _pseudo is None: + _pseudo = _metadata.get("pseudo") + _tags = _payload.get("tags") + if _tags is None: + _tags = _metadata.get("tags") # Skip memory-like points without a real file path if not _path or not _path.strip(): if os.environ.get("DEBUG_HYBRID_FILTER"): @@ -2694,6 +2702,8 @@ def _resolve(seg: str) -> list[str]: "span_budgeted": bool(m.get("_merged_start") is not None), "budget_tokens_used": m.get("_budget_tokens"), "text": _text, + "pseudo": _pseudo, + "tags": _tags, } ) if _USE_CACHE and cache_key is not None: diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index b4d49cff..1a4c0b9c 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -2191,24 +2191,62 @@ def _to_str_list(x): import concurrent.futures as _fut rq = queries[0] if queries else "" - # Prepare candidate docs from top-N hybrid hits (path+symbol + small snippet) + # Prepare candidate docs from top-N hybrid hits (path+symbol + pseudo/tags + small snippet) cand_objs = list(json_lines[: int(rerank_top_n)]) def _doc_for(obj: dict) -> str: path = str(obj.get("path") or "") symbol = str(obj.get("symbol") or "") header = f"{symbol} — {path}".strip() + + # Try to enrich with pseudo/tags from underlying payload when available. + # We expect hybrid to have preserved metadata in obj["components"] or + # direct fields; if not, we fall back to header+code only. + meta_lines: list[str] = [header] if header else [] + try: + # Prefer explicit pseudo/tags fields on the top-level object when present + pseudo_val = obj.get("pseudo") + tags_val = obj.get("tags") + if pseudo_val is None or tags_val is None: + # Fallback: inspect a nested metadata view when present + md = obj.get("metadata") or {} + if pseudo_val is None: + pseudo_val = md.get("pseudo") + if tags_val is None: + tags_val = md.get("tags") + pseudo_s = str(pseudo_val).strip() if pseudo_val is not None else "" + if pseudo_s: + # Keep pseudo short to avoid bloating rerank input + meta_lines.append(f"Summary: {pseudo_s[:256]}") + if tags_val: + try: + if isinstance(tags_val, (list, tuple)): + tags_text = ", ".join( + str(x) for x in tags_val + )[:128] + if tags_text: + meta_lines.append(f"Tags: {tags_text}") + else: + tags_text = str(tags_val)[:128] + if tags_text: + meta_lines.append(f"Tags: {tags_text}") + except Exception: + pass + except Exception: + # If any of the above fails, we just keep header-only + pass + sl = int(obj.get("start_line") or 0) el = int(obj.get("end_line") or 0) if not path or not sl: - return header + return "\n".join(meta_lines) if meta_lines else header try: p = path if not os.path.isabs(p): p = os.path.join("/work", p) realp = os.path.realpath(p) if not (realp == "/work" or realp.startswith("/work/")): - return header + return "\n".join(meta_lines) if meta_lines else header with open( realp, "r", encoding="utf-8", errors="ignore" ) as f: @@ -2221,11 +2259,12 @@ def _doc_for(obj: dict) -> str: si = max(1, sl - ctx) ei = min(len(lines), max(sl, el) + ctx) snippet = "".join(lines[si - 1 : ei]).strip() - return ( - header + ("\n" + snippet if snippet else "") - ).strip() + if snippet: + meta = "\n".join(meta_lines) if meta_lines else header + return (meta + "\n\n" + snippet).strip() + return "\n".join(meta_lines) if meta_lines else header except Exception: - return header + return "\n".join(meta_lines) if meta_lines else header # Build docs concurrently max_workers = min(16, (os.cpu_count() or 4) * 4) From 65575eb05f44d30843c59353d5f3327294aeb3c8 Mon Sep 17 00:00:00 2001 From: John Donalson <11264689+m1rl0k@users.noreply.github.com> Date: Mon, 1 Dec 2025 06:09:37 -0500 Subject: [PATCH 05/18] Update hybrid_search.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit feat(hybrid-search): auto-scale search parameters for large codebases Automatically adjust RRF and retrieval parameters based on collection size to maintain search quality at scale (100k-500k+ LOC codebases). Changes: - Add _scale_rrf_k(): logarithmic RRF k scaling for better score discrimination - Add _adaptive_per_query(): sqrt-based candidate retrieval scaling - Add _normalize_scores(): z-score + sigmoid normalization for compressed distributions - Add _get_collection_stats(): cached collection size lookup (5-min TTL) - Apply scaling to both MCP (run_hybrid_search) and CLI paths - All scaling enabled by default, no configuration required Scaling behavior: - Threshold: 10,000 points (configurable via HYBRID_LARGE_THRESHOLD) - RRF k: 60 → up to 180 (3x max, logarithmic) - Per-query: 24 → up to 72 (3x max, sqrt scaling) - Score normalization spreads compressed score ranges Small codebases (<10k points) are unaffected - parameters unchanged. === Large Codebase Scaling Tests === LARGE_COLLECTION_THRESHOLD: 10000 MAX_RRF_K_SCALE: 3.0 SCORE_NORMALIZE_ENABLED: True Base RRF_K: 60 --- RRF K Scaling --- 5000 points -> k=60 10000 points -> k=60 50000 points -> k=101 100000 points -> k=120 250000 points -> k=143 500000 points -> k=161 --- Per-Query Scaling --- 5000 points -> per_query=24 (filtered=24) 10000 points -> per_query=24 (filtered=24) 50000 points -> per_query=53 (filtered=37) 100000 points -> per_query=72 (filtered=53) 250000 points -> per_query=72 (filtered=72) 500000 points -> per_query=72 (filtered=72) --- Score Normalization --- Before (compressed): [0.5, 0.505, 0.51, 0.495] After (spread): [0.4443, 0.5557, 0.6617, 0.3383] Range: 0.4950-0.5100 -> 0.3383-0.6617 ------------------- Collection: codebase Points: 16622 Threshold: 10000 Scaling Active: True RRF K: 60 -> 73 (scale factor: 1.22x) Per-query: 24 -> 30 (no filters) Per-query: 24 -> 24 (with filters) --- scripts/hybrid_search.py | 158 ++++++++++++++++++++++++++++++++++----- 1 file changed, 139 insertions(+), 19 deletions(-) diff --git a/scripts/hybrid_search.py b/scripts/hybrid_search.py index 95fb049e..7bf4bd96 100644 --- a/scripts/hybrid_search.py +++ b/scripts/hybrid_search.py @@ -308,6 +308,90 @@ def _embed_queries_cached( MICRO_BUDGET_TOKENS = _safe_int(os.environ.get("MICRO_BUDGET_TOKENS", "512"), 512) MICRO_TOKENS_PER_LINE = _safe_int(os.environ.get("MICRO_TOKENS_PER_LINE", "32"), 32) +# === Large codebase scaling (default ON) === +# These parameters enable automatic scaling for collections with 10k+ points +LARGE_COLLECTION_THRESHOLD = _safe_int(os.environ.get("HYBRID_LARGE_THRESHOLD", "10000"), 10000) +MAX_RRF_K_SCALE = _safe_float(os.environ.get("HYBRID_MAX_RRF_K_SCALE", "3.0"), 3.0) +SCORE_NORMALIZE_ENABLED = os.environ.get("HYBRID_SCORE_NORMALIZE", "1").lower() in {"1", "true", "yes", "on"} + +# Cache for collection stats (avoid repeated Qdrant calls) +_COLL_STATS_CACHE: Dict[str, Tuple[float, Dict[str, Any]]] = {} +_COLL_STATS_TTL = 300 # 5 minutes + +def _get_collection_stats(client: QdrantClient, coll_name: str) -> Dict[str, Any]: + """Get cached collection statistics for scaling decisions.""" + import time + now = time.time() + cached = _COLL_STATS_CACHE.get(coll_name) + if cached and (now - cached[0]) < _COLL_STATS_TTL: + return cached[1] + try: + info = client.get_collection(coll_name) + stats = {"points_count": info.points_count or 0} + _COLL_STATS_CACHE[coll_name] = (now, stats) + return stats + except Exception: + return {"points_count": 0} + +def _scale_rrf_k(base_k: int, collection_size: int) -> int: + """Scale RRF k parameter based on collection size. + + For large collections, increase k to spread score distribution. + Uses logarithmic scaling: k_scaled = k * (1 + log10(size/threshold)) + Capped at MAX_RRF_K_SCALE * base_k. + """ + if collection_size < LARGE_COLLECTION_THRESHOLD: + return base_k + ratio = collection_size / LARGE_COLLECTION_THRESHOLD + scale = 1.0 + math.log10(max(1, ratio)) + scale = min(scale, MAX_RRF_K_SCALE) + return int(base_k * scale) + +def _adaptive_per_query(base_limit: int, collection_size: int, has_filters: bool) -> int: + """Increase candidate retrieval for larger collections. + + Uses sublinear sqrt scaling to avoid excessive retrieval. + Filters reduce the need for extra candidates. + """ + if collection_size < LARGE_COLLECTION_THRESHOLD: + return base_limit + ratio = collection_size / LARGE_COLLECTION_THRESHOLD + # sqrt scaling: doubles at 4x threshold, triples at 9x + scale = math.sqrt(ratio) + if has_filters: + scale = max(1.0, scale * 0.7) # reduce scaling when filters are active + # Cap at 3x base limit + scaled = int(base_limit * min(scale, 3.0)) + return max(base_limit, min(scaled, 200)) + +def _normalize_scores(score_map: Dict[str, Dict[str, Any]], collection_size: int) -> None: + """Normalize scores using z-score + sigmoid for large collections. + + This spreads compressed score distributions to improve discrimination. + Only applies when SCORE_NORMALIZE_ENABLED=true and collection is large. + """ + if not SCORE_NORMALIZE_ENABLED: + return + if collection_size < LARGE_COLLECTION_THRESHOLD: + return + if len(score_map) < 3: + return + + scores = [rec["s"] for rec in score_map.values()] + mean_s = sum(scores) / len(scores) + var_s = sum((s - mean_s) ** 2 for s in scores) / len(scores) + std_s = math.sqrt(var_s) if var_s > 0 else 1.0 + + if std_s < 1e-6: + return # All scores identical, nothing to normalize + + # Z-score normalization + sigmoid to [0, 1] range + for rec in score_map.values(): + z = (rec["s"] - mean_s) / std_s + # Sigmoid with scale factor for wider spread + normalized = 1.0 / (1.0 + math.exp(-z * 0.5)) + rec["s"] = normalized + def _merge_and_budget_spans(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Given ranked items with metadata path/start_line/end_line, merge nearby spans @@ -1675,11 +1759,29 @@ def _bn(p: str) -> str: except Exception: pass - # Lexical vector query (keep original RRF scoring) + # === Large codebase scaling (automatic) === + _coll_stats = _get_collection_stats(client, _collection(collection)) + _coll_size = _coll_stats.get("points_count", 0) + _has_filters = bool(eff_language or eff_repo or eff_under or eff_kind or eff_symbol or eff_ext) + + # Scale RRF k for better score discrimination at scale + _scaled_rrf_k = _scale_rrf_k(RRF_K, _coll_size) + + # Adaptive per_query: retrieve more candidates from larger collections + _scaled_per_query = _adaptive_per_query(max(24, limit), _coll_size, _has_filters) + + if os.environ.get("DEBUG_HYBRID_SEARCH") and _coll_size >= LARGE_COLLECTION_THRESHOLD: + logger.debug(f"Large collection scaling: size={_coll_size}, rrf_k={_scaled_rrf_k}, per_query={_scaled_per_query}") + + # Local RRF function using scaled k + def _scaled_rrf(rank: int) -> float: + return 1.0 / (_scaled_rrf_k + rank) + + # Lexical vector query (with scaled retrieval) score_map: Dict[str, Dict[str, Any]] = {} try: lex_vec = lex_hash_vector(qlist) - lex_results = lex_query(client, lex_vec, flt, max(24, limit), collection) + lex_results = lex_query(client, lex_vec, flt, _scaled_per_query, collection) except Exception: lex_results = [] @@ -1711,7 +1813,7 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - lxs = (_AD_LEX_VEC_W * rrf(rank)) if _USE_ADAPT else (LEX_VECTOR_WEIGHT * rrf(rank)) + lxs = (_AD_LEX_VEC_W * _scaled_rrf(rank)) if _USE_ADAPT else (LEX_VECTOR_WEIGHT * _scaled_rrf(rank)) score_map[pid]["lx"] += lxs score_map[pid]["s"] += lxs @@ -1832,7 +1934,7 @@ def _bn(p: str) -> str: flt_gated = _sanitize_filter_obj(flt_gated) result_sets: List[List[Any]] = [ - dense_query(client, vec_name, v, flt_gated, max(24, limit), collection) for v in embedded + dense_query(client, vec_name, v, flt_gated, _scaled_per_query, collection) for v in embedded ] if os.environ.get("DEBUG_HYBRID_SEARCH"): total_dense_results = sum(len(rs) for rs in result_sets) @@ -1849,7 +1951,7 @@ def _bn(p: str) -> str: try: mini_queries = [_project_mini(list(v), MINI_VEC_DIM) for v in embedded] mini_sets: List[List[Any]] = [ - dense_query(client, MINI_VECTOR_NAME, mv, flt, max(24, limit), collection) + dense_query(client, MINI_VECTOR_NAME, mv, flt, _scaled_per_query, collection) for mv in mini_queries ] for res in mini_sets: @@ -1871,7 +1973,7 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - dens = float(HYBRID_MINI_WEIGHT) * rrf(rank) + dens = float(HYBRID_MINI_WEIGHT) * _scaled_rrf(rank) score_map[pid]["d"] += dens score_map[pid]["s"] += dens except Exception: @@ -1937,7 +2039,7 @@ def _bn(p: str) -> str: }, ) # Lower weight for semantic PRF to avoid over-diversification - dens = 0.3 * prf_dw * rrf(rank) + dens = 0.3 * prf_dw * _scaled_rrf(rank) score_map[pid]["d"] += dens score_map[pid]["s"] += dens @@ -2026,14 +2128,15 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - lxs = prf_lw * rrf(rank) + lxs = prf_lw * _scaled_rrf(rank) score_map[pid]["lx"] += lxs score_map[pid]["s"] += lxs # Dense PRF pass try: embedded2 = _embed_queries_cached(_model, prf_qs) + _prf_per_query = max(12, _scaled_per_query // 2) result_sets2: List[List[Any]] = [ - dense_query(client, vec_name, v, flt, max(12, limit // 2 or 6), collection) + dense_query(client, vec_name, v, flt, _prf_per_query, collection) for v in embedded2 ] for res2 in result_sets2: @@ -2055,13 +2158,13 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - dens = prf_dw * rrf(rank) + dens = prf_dw * _scaled_rrf(rank) score_map[pid]["d"] += dens score_map[pid]["s"] += dens except Exception: pass - # Add dense scores (keep original RRF scoring) + # Add dense scores (with scaled RRF) for res in result_sets: for rank, p in enumerate(res, 1): pid = str(p.id) @@ -2081,7 +2184,7 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - dens = (_AD_DENSE_W * rrf(rank)) if _USE_ADAPT else (DENSE_WEIGHT * rrf(rank)) + dens = (_AD_DENSE_W * _scaled_rrf(rank)) if _USE_ADAPT else (DENSE_WEIGHT * _scaled_rrf(rank)) score_map[pid]["d"] += dens score_map[pid]["s"] += dens @@ -2208,6 +2311,10 @@ def _bn(p: str) -> str: rec["rec"] += rec_comp rec["s"] += rec_comp + # === Large codebase score normalization === + # Spread compressed score distributions for better discrimination + _normalize_scores(score_map, _coll_size) + def _tie_key(m: Dict[str, Any]): md = (m["pt"].payload or {}).get("metadata") or {} sp = str(md.get("symbol_path") or md.get("symbol") or "") @@ -2216,7 +2323,7 @@ def _tie_key(m: Dict[str, Any]): return (-float(m["s"]), len(sp), path, start_line) if os.environ.get("DEBUG_HYBRID_SEARCH"): - logger.debug(f"score_map has {len(score_map)} items before ranking") + logger.debug(f"score_map has {len(score_map)} items before ranking (coll_size={_coll_size})") ranked = sorted(score_map.values(), key=_tie_key) if os.environ.get("DEBUG_HYBRID_SEARCH"): logger.debug(f"ranked has {len(ranked)} items after sorting") @@ -2911,6 +3018,16 @@ def _norm_under(u: str | None) -> str | None: flt = models.Filter(must=must) if must else None flt = _sanitize_filter_obj(flt) + # === Large codebase scaling (CLI path) === + _cli_coll_stats = _get_collection_stats(client, eff_collection) + _cli_coll_size = _cli_coll_stats.get("points_count", 0) + _cli_has_filters = bool(eff_language or eff_repo or eff_under or eff_kind or eff_symbol or eff_ext) + _cli_scaled_rrf_k = _scale_rrf_k(RRF_K, _cli_coll_size) + _cli_scaled_per_query = _adaptive_per_query(args.per_query, _cli_coll_size, _cli_has_filters) + + def _cli_scaled_rrf(rank: int) -> float: + return 1.0 / (_cli_scaled_rrf_k + rank) + # Build query set (optionally expanded) queries = list(clean_queries) # Initialize score map early so we can accumulate from lex and dense @@ -2918,7 +3035,7 @@ def _norm_under(u: str | None) -> str | None: # Server-side lexical vector search (hashing) as an additional ranked list try: lex_vec = lex_hash_vector(queries) - lex_results = lex_query(client, lex_vec, flt, args.per_query, eff_collection) + lex_results = lex_query(client, lex_vec, flt, _cli_scaled_per_query, eff_collection) except Exception: lex_results = [] @@ -2945,7 +3062,7 @@ def _norm_under(u: str | None) -> str | None: else: queries = expand_queries(queries, eff_language) - # Add server-side lexical vector ranking into fusion + # Add server-side lexical vector ranking into fusion (with scaled RRF) for rank, p in enumerate(lex_results, 1): pid = str(p.id) score_map.setdefault( @@ -2964,16 +3081,16 @@ def _norm_under(u: str | None) -> str | None: "test": 0.0, }, ) - lxs = (_AD_LEX_VEC_W2 * rrf(rank)) if _USE_ADAPT2 else (LEX_VECTOR_WEIGHT * rrf(rank)) + lxs = (_AD_LEX_VEC_W2 * _cli_scaled_rrf(rank)) if _USE_ADAPT2 else (LEX_VECTOR_WEIGHT * _cli_scaled_rrf(rank)) score_map[pid]["lx"] += lxs score_map[pid]["s"] += lxs embedded = _embed_queries_cached(model, queries) result_sets: List[List[Any]] = [ - dense_query(client, vec_name, v, flt, args.per_query, eff_collection) for v in embedded + dense_query(client, vec_name, v, flt, _cli_scaled_per_query, eff_collection) for v in embedded ] - # RRF fusion (weighted) + # RRF fusion (weighted, with scaled RRF) for res in result_sets: for rank, p in enumerate(res, 1): pid = str(p.id) @@ -2993,7 +3110,7 @@ def _norm_under(u: str | None) -> str | None: "test": 0.0, }, ) - dens = (_AD_DENSE_W2 * rrf(rank)) if _USE_ADAPT2 else (DENSE_WEIGHT * rrf(rank)) + dens = (_AD_DENSE_W2 * _cli_scaled_rrf(rank)) if _USE_ADAPT2 else (DENSE_WEIGHT * _cli_scaled_rrf(rank)) score_map[pid]["d"] += dens score_map[pid]["s"] += dens @@ -3074,6 +3191,9 @@ def _norm_under(u: str | None) -> str | None: rec["rec"] += rec_comp rec["s"] += rec_comp + # === Large codebase score normalization (CLI path) === + _normalize_scores(score_map, _cli_coll_size) + # Rank with deterministic tie-breakers def _tie_key(m: Dict[str, Any]): md = (m["pt"].payload or {}).get("metadata") or {} From be111327ba2849388ea96aad97f8e3ae8efa6ab9 Mon Sep 17 00:00:00 2001 From: Reese Date: Tue, 2 Dec 2025 01:26:04 +0000 Subject: [PATCH 06/18] bugfix: watch_index: fix multi-repo collection selection and deletes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Avoid deriving a root-level "/work-" collection in multi-repo mode - Resolve per-file Qdrant collections via get_collection_for_file for all data ops - Fix on_deleted and move/rename delete paths to use repo-specific collections instead of the watcher’s default_collection --- scripts/watch_index.py | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/scripts/watch_index.py b/scripts/watch_index.py index ba373d82..d06bf9b0 100644 --- a/scripts/watch_index.py +++ b/scripts/watch_index.py @@ -234,7 +234,12 @@ def __init__( self.client = client resolved_collection = collection if collection is not None else default_collection self.default_collection = resolved_collection - self.collection = resolved_collection + # In multi-repo mode, per-file collections are resolved via _get_collection_for_file. + # Avoid using a root-level default collection (e.g., "/work-") for data ops. + if is_multi_repo_mode(): + self.collection = None + else: + self.collection = resolved_collection self.excl = idx._Excluder(root) # Track ignore file for live reloads try: @@ -339,7 +344,10 @@ def on_deleted(self, event): return if self.client is not None: try: - collection = self.collection or _get_collection_for_file(p) + if is_multi_repo_mode(): + collection = _get_collection_for_file(p) + else: + collection = self.collection or _get_collection_for_file(p) idx.delete_points_by_path(self.client, collection, str(p)) print(f"[deleted] {p} -> {collection}") except Exception: @@ -408,8 +416,12 @@ def on_moved(self, event): if self.excl.exclude_dir(rel_dir): if src.suffix.lower() in idx.CODE_EXTS: try: + if is_multi_repo_mode(): + coll = _get_collection_for_file(src) + else: + coll = self.collection or _get_collection_for_file(src) idx.delete_points_by_path( - self.client, self.collection, str(src) + self.client, coll, str(src) ) print(f"[moved:ignored_dest_deleted_src] {src} -> {dest}") try: @@ -488,11 +500,15 @@ def on_moved(self, event): try: idx.delete_points_by_path(self.client, src_collection, str(src)) except Exception: - idx.delete_points_by_path( - self.client, - self.collection or src_collection, - str(src), - ) + # In multi-repo mode, avoid falling back to any root-level collection. + if (not is_multi_repo_mode()) and self.collection: + idx.delete_points_by_path( + self.client, + self.collection, + str(src), + ) + else: + raise print(f"[moved:deleted_src] {src}") except Exception: pass @@ -716,7 +732,9 @@ def main(): multi_repo_enabled = False default_collection = os.environ.get("COLLECTION_NAME", "my-collection") - if _get_coll: + # In multi-repo mode, per-repo collections are resolved via _get_collection_for_file + # and workspace_state; avoid deriving a root-level collection like "/work-". + if _get_coll and not multi_repo_enabled: try: resolved = _get_coll(str(ROOT)) if resolved: From fa119e33c13ff1ea2990c09d8c94b7d590370c38 Mon Sep 17 00:00:00 2001 From: Reese Date: Wed, 3 Dec 2025 15:49:18 +0000 Subject: [PATCH 07/18] perf: Implements pseudo/tag backfill for Qdrant Adds a background worker to backfill missing pseudo/tags and lexical vectors in Qdrant. This allows for a two-phase indexing process where base vectors are written first, followed by a background process to enrich them. This is enabled via the `PSEUDO_BACKFILL_ENABLED` environment variable and configured with interval and batch size. --- .env.example | 11 ++ scripts/ingest_code.py | 236 ++++++++++++++++++++++++++++++++++------- scripts/watch_index.py | 80 +++++++++++++- 3 files changed, 289 insertions(+), 38 deletions(-) diff --git a/.env.example b/.env.example index 3da5e865..b3711f72 100644 --- a/.env.example +++ b/.env.example @@ -42,6 +42,7 @@ FASTMCP_HTTP_PORT=8002 FASTMCP_HTTP_HEALTH_PORT=18002 FASTMCP_INDEXER_HTTP_PORT=8003 FASTMCP_INDEXER_HTTP_HEALTH_PORT=18003 +# MCP_INDEXER_URL=http://localhost:8003/mcp # Optional: local cross-encoder reranker (ONNX) @@ -134,6 +135,7 @@ REFRAG_DECODER_MODE=prompt # prompt|soft # GLM_API_BASE=https://api.z.ai/api/coding/paas/v4/ # GLM_MODEL=glm-4.6 +# GLM_API_KEY=your_glm_api_key_here # GPU Performance Toggle # Set to 1 to use native GPU-accelerated server on localhost:8081 @@ -174,6 +176,15 @@ SMART_SYMBOL_REINDEXING=0 # INDEX_UPSERT_BACKOFF=0.5 # Debounce file events to coalesce bursts # WATCH_DEBOUNCE_SECS=1.5 +# Optional 2-phase pseudo/tag mode (disabled by default). +# When enabled, indexer/watcher write base-only vectors and a background +# backfill worker adds pseudo/tags via Qdrant. +# PSEUDO_BACKFILL_ENABLED=1 +# PSEUDO_BACKFILL_TICK_SECS=60 +# PSEUDO_BACKFILL_MAX_POINTS=256 + +# Development Remote Upload Configuration +# HOST_INDEX_PATH=./dev-workspace # Remote upload git history (used by upload clients) # Max number of commits to include per bundle (0 disables git history) diff --git a/scripts/ingest_code.py b/scripts/ingest_code.py index dc769dcd..f525f2cf 100644 --- a/scripts/ingest_code.py +++ b/scripts/ingest_code.py @@ -1127,6 +1127,154 @@ def ensure_payload_indexes(client: QdrantClient, collection: str): ENSURED_COLLECTIONS: set[str] = set() +def pseudo_backfill_tick( + client: QdrantClient, + collection: str, + repo_name: str | None = None, + *, + max_points: int = 256, +) -> int: + """Best-effort pseudo/tag backfill for a collection. + + Scans up to max_points points for a given repo (when provided) that have not yet + been marked as pseudo-enriched and updates them in-place with pseudo/tags and + refreshed lexical vectors. Does not touch cache.json or hash-based skip logic; + operates purely on Qdrant payloads/vectors. + """ + + if not collection or max_points <= 0: + return 0 + + try: + from qdrant_client import models as _models + except Exception: + return 0 + + must_conditions: list[Any] = [] + if repo_name: + try: + must_conditions.append( + _models.FieldCondition( + key="metadata.repo", + match=_models.MatchValue(value=repo_name), + ) + ) + except Exception: + pass + + flt = None + try: + # Prefer server-side filtering for points missing pseudo/tags when supported + null_cond = getattr(_models, "IsNullCondition", None) + empty_cond = getattr(_models, "IsEmptyCondition", None) + if null_cond is not None: + should_conditions = [] + try: + should_conditions.append(null_cond(is_null="pseudo")) + except Exception: + pass + try: + should_conditions.append(null_cond(is_null="tags")) + except Exception: + pass + if empty_cond is not None: + try: + should_conditions.append(empty_cond(is_empty="tags")) + except Exception: + pass + flt = _models.Filter( + must=must_conditions or None, + should=should_conditions or None, + ) + else: + # Fallback: only scope by repo, rely on Python-side pseudo/tags checks + flt = _models.Filter(must=must_conditions or None) + except Exception: + flt = None + + processed = 0 + next_offset = None + + while processed < max_points: + batch_limit = max(1, min(64, max_points - processed)) + try: + points, next_offset = client.scroll( + collection_name=collection, + scroll_filter=flt, + limit=batch_limit, + with_payload=True, + with_vectors=True, + offset=next_offset, + ) + except Exception: + break + + if not points: + break + + new_points: list[Any] = [] + for rec in points: + try: + payload = rec.payload or {} + md = payload.get("metadata") or {} + code = md.get("code") or "" + if not code: + continue + + pseudo = payload.get("pseudo") or "" + tags_val = payload.get("tags") or [] + tags: list[str] = list(tags_val) if isinstance(tags_val, list) else [] + + # If pseudo/tags are missing, generate them once + if not pseudo and not tags: + try: + pseudo, tags = generate_pseudo_tags(code) + except Exception: + pseudo, tags = "", [] + + if not pseudo and not tags: + continue + + # Update payload and lexical vector with pseudo/tags + payload["pseudo"] = pseudo + payload["tags"] = tags + + aug_text = f"{code} {pseudo} {' '.join(tags)}".strip() + lex_vec = _lex_hash_vector_text(aug_text) + + vec = rec.vector + if isinstance(vec, dict): + vecs = dict(vec) + vecs[LEX_VECTOR_NAME] = lex_vec + new_vec = vecs + else: + # Fallback: collections without named vectors - leave dense vector as-is + new_vec = vec + + new_points.append( + models.PointStruct( + id=rec.id, + vector=new_vec, + payload=payload, + ) + ) + processed += 1 + except Exception: + continue + + if new_points: + try: + upsert_points(client, collection, new_points) + except Exception: + # Best-effort: on failure, stop this tick + break + + if next_offset is None: + break + + return processed + + def ensure_collection_and_indexes_once( client: QdrantClient, collection: str, @@ -2024,6 +2172,7 @@ def index_single_file( *, dedupe: bool = True, skip_unchanged: bool = True, + pseudo_mode: str = "full", ) -> bool: """Index a single file path. Returns True if indexed, False if skipped.""" try: @@ -2287,26 +2436,29 @@ def make_point(pid, dense_vec, lex_vec, payload): } # Optional LLM enrichment for lexical retrieval: pseudo + tags per micro-chunk # Use symbol-aware gating and cached pseudo/tags where possible - needs_pseudo, cached_pseudo, cached_tags = should_process_pseudo_for_chunk( - str(file_path), ch, changed_symbols - ) - pseudo, tags = cached_pseudo, cached_tags - if needs_pseudo: - try: - pseudo, tags = generate_pseudo_tags(ch.get("text") or "") - if pseudo or tags: - # Cache the pseudo data for this symbol - symbol_name = ch.get("symbol", "") - if symbol_name: - kind = ch.get("kind", "unknown") - start_line = ch.get("start", 0) - symbol_id = f"{kind}_{symbol_name}_{start_line}" - - if set_cached_pseudo: - set_cached_pseudo(str(file_path), symbol_id, pseudo, tags, file_hash) - except Exception: - # Fall back to cached values (if any) or empty pseudo/tags - pass + pseudo = "" + tags = [] + if pseudo_mode != "off": + needs_pseudo, cached_pseudo, cached_tags = should_process_pseudo_for_chunk( + str(file_path), ch, changed_symbols + ) + pseudo, tags = cached_pseudo, cached_tags + if pseudo_mode == "full" and needs_pseudo: + try: + pseudo, tags = generate_pseudo_tags(ch.get("text") or "") + if pseudo or tags: + # Cache the pseudo data for this symbol + symbol_name = ch.get("symbol", "") + if symbol_name: + kind = ch.get("kind", "unknown") + start_line = ch.get("start", 0) + symbol_id = f"{kind}_{symbol_name}_{start_line}" + + if set_cached_pseudo: + set_cached_pseudo(str(file_path), symbol_id, pseudo, tags, file_hash) + except Exception: + # Fall back to cached values (if any) or empty pseudo/tags + pass # Attach whichever pseudo/tags we ended up with (cached or freshly generated) if pseudo: payload["pseudo"] = pseudo @@ -2353,6 +2505,7 @@ def index_repo( *, dedupe: bool = True, skip_unchanged: bool = True, + pseudo_mode: str = "full", ): model = TextEmbedding(model_name=model_name) # Determine embedding dimension @@ -2852,23 +3005,26 @@ def make_point(pid, dense_vec, lex_vec, payload): } # Optional LLM enrichment for lexical retrieval: pseudo + tags per micro-chunk # Use symbol-aware gating and cached pseudo/tags where possible - needs_pseudo, cached_pseudo, cached_tags = should_process_pseudo_for_chunk( - str(file_path), ch, changed_symbols - ) - pseudo, tags = cached_pseudo, cached_tags - if needs_pseudo: - try: - pseudo, tags = generate_pseudo_tags(ch.get("text") or "") - if pseudo or tags: - symbol_name = ch.get("symbol", "") - if symbol_name: - kind = ch.get("kind", "unknown") - start_line = ch.get("start", 0) - symbol_id = f"{kind}_{symbol_name}_{start_line}" - if set_cached_pseudo: - set_cached_pseudo(str(file_path), symbol_id, pseudo, tags, file_hash) - except Exception: - pass + pseudo = "" + tags: list[str] = [] + if pseudo_mode != "off": + needs_pseudo, cached_pseudo, cached_tags = should_process_pseudo_for_chunk( + str(file_path), ch, changed_symbols + ) + pseudo, tags = cached_pseudo, cached_tags + if pseudo_mode == "full" and needs_pseudo: + try: + pseudo, tags = generate_pseudo_tags(ch.get("text") or "") + if pseudo or tags: + symbol_name = ch.get("symbol", "") + if symbol_name: + kind = ch.get("kind", "unknown") + start_line = ch.get("start", 0) + symbol_id = f"{kind}_{symbol_name}_{start_line}" + if set_cached_pseudo: + set_cached_pseudo(str(file_path), symbol_id, pseudo, tags, file_hash) + except Exception: + pass if pseudo: payload["pseudo"] = pseudo if tags: @@ -3596,6 +3752,9 @@ def main(): collection = os.environ.get("COLLECTION_NAME", "codebase") print(f"[single_repo] Single-repo mode enabled - using collection: {collection}") + flag = (os.environ.get("PSEUDO_BACKFILL_ENABLED") or "").strip().lower() + pseudo_mode = "off" if flag in {"1", "true", "yes", "on"} else "full" + index_repo( Path(args.root).resolve(), qdrant_url, @@ -3605,6 +3764,9 @@ def main(): args.recreate, dedupe=(not args.no_dedupe), skip_unchanged=(not args.no_skip_unchanged), + # Pseudo/tags are inlined by default; when PSEUDO_BACKFILL_ENABLED=1 we run + # base-only and rely on the background backfill worker to add pseudo/tags. + pseudo_mode=pseudo_mode, ) diff --git a/scripts/watch_index.py b/scripts/watch_index.py index d06bf9b0..151fda42 100644 --- a/scripts/watch_index.py +++ b/scripts/watch_index.py @@ -35,6 +35,9 @@ ensure_logical_repo_id, find_collection_for_logical_repo, logical_repo_reuse_enabled, + _get_repo_state_dir, + _cross_process_lock, + get_collection_mappings, ) import hashlib from datetime import datetime @@ -718,6 +721,75 @@ def _rename_in_store( return -1, None +def _start_pseudo_backfill_worker(client: QdrantClient, default_collection: str) -> None: + flag = (os.environ.get("PSEUDO_BACKFILL_ENABLED") or "").strip().lower() + if flag not in {"1", "true", "yes", "on"}: + return + + try: + interval = float(os.environ.get("PSEUDO_BACKFILL_TICK_SECS", "60") or 60.0) + except Exception: + interval = 60.0 + if interval <= 0: + return + try: + max_points = int(os.environ.get("PSEUDO_BACKFILL_MAX_POINTS", "256") or 256) + except Exception: + max_points = 256 + if max_points <= 0: + max_points = 1 + + def _worker() -> None: + while True: + try: + try: + mappings = get_collection_mappings(search_root=str(ROOT)) + except Exception: + mappings = [] + if not mappings: + mappings = [ + {"repo_name": None, "collection_name": default_collection}, + ] + for mapping in mappings: + coll = mapping.get("collection_name") or default_collection + repo_name = mapping.get("repo_name") + if not coll: + continue + try: + if is_multi_repo_mode() and repo_name: + state_dir = _get_repo_state_dir(repo_name) + else: + state_dir = _get_global_state_dir(str(ROOT)) + lock_path = state_dir / "pseudo.lock" + with _cross_process_lock(lock_path): + processed = idx.pseudo_backfill_tick( + client, + coll, + repo_name=repo_name, + max_points=max_points, + ) + if processed: + try: + print( + f"[pseudo_backfill] repo={repo_name or 'default'} collection={coll} processed={processed}" + ) + except Exception: + pass + except Exception as e: + try: + print( + f"[pseudo_backfill] error repo={repo_name or 'default'} collection={coll}: {e}" + ) + except Exception: + pass + except Exception: + pass + time.sleep(interval) + + thread = threading.Thread(target=_worker, name="pseudo-backfill", daemon=True) + thread.start() + + def main(): # Resolve collection name from workspace state before any client/state ops try: @@ -806,6 +878,8 @@ def main(): except Exception: pass + _start_pseudo_backfill_worker(client, default_collection) + try: if multi_repo_enabled: root_repo_name = _extract_repo_name_from_path(str(ROOT)) @@ -1061,8 +1135,11 @@ def _process_paths(paths, client, model, vector_name: str, model_dim: int, works except Exception: pass - # Fallback: full single-file reindex + # Fallback: full single-file reindex. Pseudo/tags are inlined by default; + # when PSEUDO_BACKFILL_ENABLED=1 we run base-only and rely on backfill. if not ok: + flag = (os.environ.get("PSEUDO_BACKFILL_ENABLED") or "").strip().lower() + pseudo_mode = "off" if flag in {"1", "true", "yes", "on"} else "full" ok = idx.index_single_file( client, model, @@ -1071,6 +1148,7 @@ def _process_paths(paths, client, model, vector_name: str, model_dim: int, works p, dedupe=True, skip_unchanged=False, + pseudo_mode=pseudo_mode, ) except Exception as e: try: From c09f5da1758e4c97ed9e399837c59b0ef1bb6127 Mon Sep 17 00:00:00 2001 From: Reese Date: Thu, 4 Dec 2025 04:09:43 +0000 Subject: [PATCH 08/18] k8 - when patching images, set qdrant image pull policy to always so there's nothing to patch --- deploy/kubernetes/qdrant.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy/kubernetes/qdrant.yaml b/deploy/kubernetes/qdrant.yaml index 7ec599f7..ba645364 100644 --- a/deploy/kubernetes/qdrant.yaml +++ b/deploy/kubernetes/qdrant.yaml @@ -24,7 +24,7 @@ spec: containers: - name: qdrant image: qdrant/qdrant:latest - imagePullPolicy: IfNotPresent + imagePullPolicy: Always ports: - name: http containerPort: 6333 From 7fe983497e593cee555cfbfb2d5c4874745a0223 Mon Sep 17 00:00:00 2001 From: Reese Date: Thu, 4 Dec 2025 04:18:02 +0000 Subject: [PATCH 09/18] k8: Adds Qdrant readiness check Adds an init container to each indexer service deployment that waits for Qdrant to be available before starting the indexer. This ensures that the indexer does not start processing data before Qdrant is ready to accept connections, preventing potential data loss or corruption. --- deploy/kubernetes/indexer-services.yaml | 42 +++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/deploy/kubernetes/indexer-services.yaml b/deploy/kubernetes/indexer-services.yaml index 271e3578..723ebd6f 100644 --- a/deploy/kubernetes/indexer-services.yaml +++ b/deploy/kubernetes/indexer-services.yaml @@ -23,6 +23,20 @@ spec: runAsUser: 1000 runAsGroup: 1000 fsGroup: 1000 + initContainers: + - name: wait-for-qdrant + image: context-engine-indexer-service + imagePullPolicy: IfNotPresent + command: + - /bin/sh + - -c + - /app/scripts/wait-for-qdrant.sh + env: + - name: QDRANT_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: QDRANT_URL containers: - name: watcher image: context-engine-indexer-service @@ -130,6 +144,20 @@ spec: runAsUser: 1000 runAsGroup: 1000 fsGroup: 1000 + initContainers: + - name: wait-for-qdrant + image: context-engine-indexer-service + imagePullPolicy: IfNotPresent + command: + - /bin/sh + - -c + - /app/scripts/wait-for-qdrant.sh + env: + - name: QDRANT_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: QDRANT_URL restartPolicy: OnFailure containers: - name: indexer @@ -207,6 +235,20 @@ spec: runAsUser: 1000 runAsGroup: 1000 fsGroup: 1000 + initContainers: + - name: wait-for-qdrant + image: context-engine-indexer-service + imagePullPolicy: IfNotPresent + command: + - /bin/sh + - -c + - /app/scripts/wait-for-qdrant.sh + env: + - name: QDRANT_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: QDRANT_URL restartPolicy: OnFailure containers: - name: init-payload From abf6c26ee931ca10e97220d14f94675ffa5a992e Mon Sep 17 00:00:00 2001 From: Reese Date: Thu, 4 Dec 2025 14:22:14 +0000 Subject: [PATCH 10/18] Adds debug mode for pseudo backfill Adds a debug mode for the pseudo backfill process, enabled via the `PSEUDO_BACKFILL_DEBUG` environment variable. When enabled, the backfill process tracks and reports detailed statistics, including scanned points, GLM calls, the number of filled and updated vectors, and the reasons for skipping vectors, providing insights into the backfill's performance and potential bottlenecks. --- .env.example | 1 + scripts/ingest_code.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/.env.example b/.env.example index b3711f72..4ffed24a 100644 --- a/.env.example +++ b/.env.example @@ -180,6 +180,7 @@ SMART_SYMBOL_REINDEXING=0 # When enabled, indexer/watcher write base-only vectors and a background # backfill worker adds pseudo/tags via Qdrant. # PSEUDO_BACKFILL_ENABLED=1 +# PSEUDO_BACKFILL_DEBUG=0 # PSEUDO_BACKFILL_TICK_SECS=60 # PSEUDO_BACKFILL_MAX_POINTS=256 diff --git a/scripts/ingest_code.py b/scripts/ingest_code.py index f525f2cf..07fbf090 100644 --- a/scripts/ingest_code.py +++ b/scripts/ingest_code.py @@ -1193,6 +1193,21 @@ def pseudo_backfill_tick( flt = None processed = 0 + debug_enabled = (os.environ.get("PSEUDO_BACKFILL_DEBUG") or "").strip().lower() in { + "1", + "true", + "yes", + "on", + } + debug_stats = { + "scanned": 0, + "glm_calls": 0, + "glm_success": 0, + "filled_new": 0, + "updated_existing": 0, + "skipped_no_code": 0, + "skipped_after_glm": 0, + } next_offset = None while processed < max_points: @@ -1215,29 +1230,45 @@ def pseudo_backfill_tick( new_points: list[Any] = [] for rec in points: try: + if debug_enabled: + debug_stats["scanned"] += 1 payload = rec.payload or {} md = payload.get("metadata") or {} code = md.get("code") or "" if not code: + if debug_enabled: + debug_stats["skipped_no_code"] += 1 continue pseudo = payload.get("pseudo") or "" tags_val = payload.get("tags") or [] tags: list[str] = list(tags_val) if isinstance(tags_val, list) else [] + had_existing = bool(pseudo or tags) # If pseudo/tags are missing, generate them once if not pseudo and not tags: try: + if debug_enabled: + debug_stats["glm_calls"] += 1 pseudo, tags = generate_pseudo_tags(code) + if debug_enabled and (pseudo or tags): + debug_stats["glm_success"] += 1 except Exception: pseudo, tags = "", [] if not pseudo and not tags: + if debug_enabled: + debug_stats["skipped_after_glm"] += 1 continue # Update payload and lexical vector with pseudo/tags payload["pseudo"] = pseudo payload["tags"] = tags + if debug_enabled: + if had_existing: + debug_stats["updated_existing"] += 1 + else: + debug_stats["filled_new"] += 1 aug_text = f"{code} {pseudo} {' '.join(tags)}".strip() lex_vec = _lex_hash_vector_text(aug_text) From 3494378ebea67049ed02b276a1341845f6c83403 Mon Sep 17 00:00:00 2001 From: Reese Date: Thu, 4 Dec 2025 17:13:43 +0000 Subject: [PATCH 11/18] perf: Adds file system metadata fast path Implements a mechanism to skip re-indexing files based on file size and modification time. This optimization, enabled by the `INDEX_FS_FASTPATH` environment variable, significantly speeds up indexing, particularly when dealing with large repositories or frequent re-indexing operations where file contents may not have changed. The logic retrieves file metadata (size and mtime) from the cache and compares it with the current file's metadata. If they match, the file is skipped, avoiding unnecessary re-reading and processing. The change also updates the cache to store file size and mtime along with the file hash. --- .env.example | 3 ++ scripts/ingest_code.py | 39 ++++++++++++++++++++++ scripts/workspace_state.py | 68 +++++++++++++++++++++++++++++++++++--- 3 files changed, 106 insertions(+), 4 deletions(-) diff --git a/.env.example b/.env.example index 4ffed24a..c94667f2 100644 --- a/.env.example +++ b/.env.example @@ -176,6 +176,9 @@ SMART_SYMBOL_REINDEXING=0 # INDEX_UPSERT_BACKOFF=0.5 # Debounce file events to coalesce bursts # WATCH_DEBOUNCE_SECS=1.5 +# Optional fs metadata fast-path for unchanged files (skips re-reading files +# when size/mtime match cache.json in the same workspace). +# INDEX_FS_FASTPATH=0 # Optional 2-phase pseudo/tag mode (disabled by default). # When enabled, indexer/watcher write base-only vectors and a background # backfill worker adds pseudo/tags via Qdrant. diff --git a/scripts/ingest_code.py b/scripts/ingest_code.py index 07fbf090..7cf71498 100644 --- a/scripts/ingest_code.py +++ b/scripts/ingest_code.py @@ -73,6 +73,7 @@ def logical_repo_reuse_enabled() -> bool: # type: ignore[no-redef] set_cached_pseudo, update_symbols_with_pseudo, get_workspace_state, + get_cached_file_meta, ) except ImportError: # State integration is optional; continue if not available @@ -90,6 +91,7 @@ def logical_repo_reuse_enabled() -> bool: # type: ignore[no-redef] update_symbols_with_pseudo = None # type: ignore compare_symbol_changes = None # type: ignore get_workspace_state = None # type: ignore + get_cached_file_meta = None # type: ignore # Optional Tree-sitter import (graceful fallback) try: @@ -2206,6 +2208,24 @@ def index_single_file( pseudo_mode: str = "full", ) -> bool: """Index a single file path. Returns True if indexed, False if skipped.""" + + fast_fs = _env_truthy(os.environ.get("INDEX_FS_FASTPATH"), False) + if skip_unchanged and fast_fs and get_cached_file_meta is not None: + try: + repo_name_for_cache = _detect_repo_name_from_path(file_path) + meta = get_cached_file_meta(str(file_path), repo_name_for_cache) or {} + size = meta.get("size") + mtime = meta.get("mtime") + if size is not None and mtime is not None: + st = file_path.stat() + if int(getattr(st, "st_size", 0)) == int(size) and int( + getattr(st, "st_mtime", 0) + ) == int(mtime): + print(f"Skipping unchanged file (fs-meta): {file_path}") + return False + except Exception: + pass + try: text = file_path.read_text(encoding="utf-8", errors="ignore") except Exception as e: @@ -2703,6 +2723,8 @@ def make_point(pid, dense_vec, lex_vec, payload): # Track per-file hashes across the entire run for cache updates on any flush batch_file_hashes = {} + fast_fs = _env_truthy(os.environ.get("INDEX_FS_FASTPATH"), False) + for file_path in iter_files(root): files_seen += 1 @@ -2716,6 +2738,23 @@ def make_point(pid, dense_vec, lex_vec, payload): else: current_collection = get_collection_name(ws_path) if get_collection_name else "default-collection" + # Optional fs-metadata fast-path: skip files whose size/mtime match cache + if skip_unchanged and fast_fs and get_cached_file_meta is not None: + try: + per_file_repo_for_cache = _detect_repo_name_from_path(file_path) + meta = get_cached_file_meta(str(file_path), per_file_repo_for_cache) or {} + size = meta.get("size") + mtime = meta.get("mtime") + if size is not None and mtime is not None: + st = file_path.stat() + if int(getattr(st, "st_size", 0)) == int(size) and int( + getattr(st, "st_mtime", 0) + ) == int(mtime): + print(f"Skipping unchanged file (fs-meta): {file_path}") + continue + except Exception: + pass + try: text = file_path.read_text(encoding="utf-8", errors="ignore") except Exception as e: diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index df48059e..0093f5f3 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -717,12 +717,20 @@ def get_cached_file_hash(file_path: str, repo_name: Optional[str] = None) -> str with open(cache_path, 'r', encoding='utf-8') as f: cache = json.load(f) file_hashes = cache.get("file_hashes", {}) - return file_hashes.get(str(Path(file_path).resolve()), "") + fp = str(Path(file_path).resolve()) + val = file_hashes.get(fp, "") + if isinstance(val, dict): + return str(val.get("hash") or "") + return str(val or "") except Exception: pass else: cache = _read_cache(_resolve_workspace_root()) - return cache.get("file_hashes", {}).get(str(Path(file_path).resolve()), "") + fp = str(Path(file_path).resolve()) + val = cache.get("file_hashes", {}).get(fp, "") + if isinstance(val, dict): + return str(val.get("hash") or "") + return str(val or "") return "" @@ -748,7 +756,18 @@ def set_cached_file_hash(file_path: str, file_hash: str, repo_name: Optional[str else: cache = {"file_hashes": {}, "created_at": datetime.now().isoformat()} - cache.setdefault("file_hashes", {})[fp] = file_hash + entry: Any = file_hash + try: + st = Path(file_path).stat() + entry = { + "hash": file_hash, + "size": int(getattr(st, "st_size", 0)), + "mtime": int(getattr(st, "st_mtime", 0)), + } + except Exception: + pass + + cache.setdefault("file_hashes", {})[fp] = entry cache["updated_at"] = datetime.now().isoformat() _atomic_write_state(cache_path, cache) # reuse atomic writer for files @@ -757,11 +776,52 @@ def set_cached_file_hash(file_path: str, file_hash: str, repo_name: Optional[str return cache = _read_cache(_resolve_workspace_root()) - cache.setdefault("file_hashes", {})[fp] = file_hash + entry: Any = file_hash + try: + st = Path(file_path).stat() + entry = { + "hash": file_hash, + "size": int(getattr(st, "st_size", 0)), + "mtime": int(getattr(st, "st_mtime", 0)), + } + except Exception: + pass + cache.setdefault("file_hashes", {})[fp] = entry cache["updated_at"] = datetime.now().isoformat() _write_cache(_resolve_workspace_root(), cache) +def get_cached_file_meta(file_path: str, repo_name: Optional[str] = None) -> Dict[str, Any]: + fp = str(Path(file_path).resolve()) + if is_multi_repo_mode() and repo_name: + state_dir = _get_repo_state_dir(repo_name) + cache_path = state_dir / CACHE_FILENAME + + if cache_path.exists(): + try: + with open(cache_path, 'r', encoding='utf-8') as f: + cache = json.load(f) + file_hashes = cache.get("file_hashes", {}) + val = file_hashes.get(fp) + except Exception: + val = None + else: + val = None + else: + cache = _read_cache(_resolve_workspace_root()) + val = cache.get("file_hashes", {}).get(fp) + + if isinstance(val, dict): + return { + "hash": str(val.get("hash") or ""), + "size": val.get("size"), + "mtime": val.get("mtime"), + } + if isinstance(val, str): + return {"hash": val} + return {} + + def remove_cached_file(file_path: str, repo_name: Optional[str] = None) -> None: """Remove file entry from cache.""" if is_multi_repo_mode() and repo_name: From 3f68eb2097e4a1478cda677c0d05d8d7ea4bbb99 Mon Sep 17 00:00:00 2001 From: Reese Date: Thu, 4 Dec 2025 22:26:05 +0000 Subject: [PATCH 12/18] Refreshes file hash cache on fast-fs When fast-fs is enabled, this commit refreshes the file hash cache with size/mtime information during file skipping. This ensures the cache remains up-to-date even when files are skipped due to unchanged content, and enhances the fast-fs performance. --- scripts/ingest_code.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/scripts/ingest_code.py b/scripts/ingest_code.py index 7cf71498..0e08d9dc 100644 --- a/scripts/ingest_code.py +++ b/scripts/ingest_code.py @@ -2288,6 +2288,12 @@ def index_single_file( if get_cached_file_hash: prev_local = get_cached_file_hash(str(file_path), repo_tag) if prev_local and file_hash and prev_local == file_hash: + # When fs fast-path is enabled, refresh cache entry with size/mtime + if fast_fs and set_cached_file_hash: + try: + set_cached_file_hash(str(file_path), file_hash, repo_tag) + except Exception: + pass print(f"Skipping unchanged file (cache): {file_path}") return False except Exception: @@ -2300,6 +2306,11 @@ def index_single_file( repo_rel_path=repo_rel_path, ) if prev and prev == file_hash: + if fast_fs and set_cached_file_hash: + try: + set_cached_file_hash(str(file_path), file_hash, repo_tag) + except Exception: + pass print(f"Skipping unchanged file: {file_path}") return False @@ -2811,6 +2822,12 @@ def make_point(pid, dense_vec, lex_vec, payload): if get_cached_file_hash: prev_local = get_cached_file_hash(str(file_path), per_file_repo) if prev_local and file_hash and prev_local == file_hash: + # When fs fast-path is enabled, refresh cache entry with size/mtime + if fast_fs and set_cached_file_hash: + try: + set_cached_file_hash(str(file_path), file_hash, per_file_repo) + except Exception: + pass if PROGRESS_EVERY <= 0 and files_seen % 50 == 0: print(f"... processed {files_seen} files (skipping unchanged, cache)") try: From 9b6293db45b83d544d077b76bd0259141161e36d Mon Sep 17 00:00:00 2001 From: Reese Date: Fri, 5 Dec 2025 02:07:51 +0000 Subject: [PATCH 13/18] Add fs-metadata no-change precheck to index_repo - Add an INDEX_FS_FASTPATH-gated precheck in index_repo that walks files using cache.json fs metadata (size/mtime) and, when every file matches, exits early before model construction and Qdrant client setup, making true no-change runs much cheaper. - Leave behavior unchanged for any new/changed/uncached files or entries missing size/mtime metadata (these still fall back to the original full index path). - Add a TODO above the collection health check to note that these expensive Qdrant probes should eventually be split into a dedicated "health-check-only" mode, so "nothing changed" runs can remain fast while still offering an explicit way to validate collections. --- scripts/ingest_code.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/scripts/ingest_code.py b/scripts/ingest_code.py index 0e08d9dc..3524cc35 100644 --- a/scripts/ingest_code.py +++ b/scripts/ingest_code.py @@ -2569,6 +2569,34 @@ def index_repo( skip_unchanged: bool = True, pseudo_mode: str = "full", ): + # Optional fast no-change precheck: when INDEX_FS_FASTPATH is enabled, use + # fs metadata + cache.json to exit early before model/Qdrant setup when all + # files are unchanged. + fast_fs = _env_truthy(os.environ.get("INDEX_FS_FASTPATH"), False) + if skip_unchanged and not recreate and fast_fs and get_cached_file_meta is not None: + try: + all_unchanged = True + for file_path in iter_files(root): + per_file_repo_for_cache = _detect_repo_name_from_path(file_path) if _detect_repo_name_from_path else None + meta = get_cached_file_meta(str(file_path), per_file_repo_for_cache) or {} + size = meta.get("size") + mtime = meta.get("mtime") + if size is None or mtime is None: + all_unchanged = False + break + st = file_path.stat() + if int(getattr(st, "st_size", 0)) != int(size) or int(getattr(st, "st_mtime", 0)) != int(mtime): + all_unchanged = False + break + if all_unchanged: + try: + print("[fast_index] No changes detected via fs metadata; skipping model and Qdrant setup") + except Exception: + pass + return + except Exception: + pass + model = TextEmbedding(model_name=model_name) # Determine embedding dimension dim = len(next(model.embed(["dimension probe"]))) @@ -2658,6 +2686,9 @@ def index_repo( ) # Health check: detect cache/collection sync issues before indexing (single-collection mode only) + # TODO: In future, consider a dedicated "health-check-only" mode/command that runs these + # expensive Qdrant probes without doing a full index pass, so that "nothing changed" runs + # can stay as cheap as possible while still offering an explicit way to validate collections. if not recreate and skip_unchanged and not use_per_repo_collections and collection: try: from scripts.collection_health import auto_heal_if_needed From 441b445155238fbf5098729dc03691c19c64fa46 Mon Sep 17 00:00:00 2001 From: Reese Date: Sun, 7 Dec 2025 17:08:54 +0000 Subject: [PATCH 14/18] Improves commit search with scoring Refactors commit search to incorporate lexical scoring, allowing for ranking of results by relevance when a query is provided. This change replaces the previous strict "all tokens must appear" filter with a field-aware scoring mechanism, enabling the system to identify and prioritize commits that better match the specified behavior phrase. The results are then sorted by score and trimmed to the requested limit. --- scripts/mcp_indexer_server.py | 77 ++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 14 deletions(-) diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 22572df3..0d628353 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -3145,6 +3145,11 @@ async def search_commits_for( mcap = int(max_points) if max_points not in (None, "") else 1000 except (ValueError, TypeError): mcap = 1000 + # When a textual query is present, enable scoring and allow scanning up to + # max_points commits so we can rank the best matches. Without a query, + # preserve the previous behavior (early exit after "limit" unique commits). + use_scoring = bool(q_terms) + max_ids_for_scan = mcap if use_scoring else lim try: from qdrant_client import QdrantClient # type: ignore @@ -3172,7 +3177,7 @@ async def search_commits_for( scanned = 0 out: list[dict[str, Any]] = [] seen_ids: set[str] = set() - while scanned < mcap and len(seen_ids) < lim: + while scanned < mcap and len(seen_ids) < max_ids_for_scan: sc, page = await asyncio.to_thread( lambda: client.scroll( collection_name=coll, @@ -3218,17 +3223,43 @@ async def search_commits_for( ][:6] else: lineage_tags = [] - # Build a composite lowercase text blob for simple lexical matching - lineage_text_parts = [] - if lineage_goal: - lineage_text_parts.append(lineage_goal) - if lineage_symbols: - lineage_text_parts.extend(lineage_symbols) - if lineage_tags: - lineage_text_parts.extend(lineage_tags) - text_l = (msg + "\n" + info + "\n" + " ".join(lineage_text_parts)).lower() - if q_terms and not all(t in text_l for t in q_terms): - continue + + # Field-aware lexical scoring using commit message + optional + # lineage metadata. This replaces the previous strict + # "all tokens must appear" filter, so we can rank commits by + # how well they match the behavior phrase. + score = 0.0 + if q_terms: + # Precompute lowercase field views once per commit + msg_l = msg.lower() + info_l = info.lower() + goal_l = lineage_goal.lower() if lineage_goal else "" + sym_l = " ".join(lineage_symbols).lower() if lineage_symbols else "" + tags_l = " ".join(lineage_tags).lower() if lineage_tags else "" + hits = 0 + for t in q_terms: + term_hit = False + if goal_l and t in goal_l: + score += 3.0 + term_hit = True + if tags_l and t in tags_l: + score += 2.0 + term_hit = True + if sym_l and t in sym_l: + score += 1.5 + term_hit = True + if msg_l and t in msg_l: + score += 1.0 + term_hit = True + if info_l and t in info_l: + score += 0.5 + term_hit = True + if term_hit: + hits += 1 + # Require at least one token match across any field when a + # query is provided; otherwise treat as non-match. + if hits == 0: + continue if p: # Require the path substring to appear in at least one touched file if not any(p in f for f in files_list): @@ -3248,11 +3279,29 @@ async def search_commits_for( "lineage_goal": lineage_goal, "lineage_symbols": lineage_symbols, "lineage_tags": lineage_tags, + "_score": score, } ) - if len(seen_ids) >= lim: + if len(seen_ids) >= max_ids_for_scan: break - return {"ok": True, "results": out, "scanned": scanned, "collection": coll} + results = out + # When scoring is enabled (query provided), sort by score descending + # and trim to the requested limit. When no query is provided, preserve + # the original behavior (scroll order, up to "lim" unique commits). + if use_scoring and results: + try: + results = sorted( + results, + key=lambda c: float(c.get("_score", 0.0)), + reverse=True, + ) + except Exception: + # On any unexpected error, fall back to unsorted results + pass + results = results[:lim] + for c in results: + c.pop("_score", None) + return {"ok": True, "results": results, "scanned": scanned, "collection": coll} except Exception as e: return {"ok": False, "error": str(e), "collection": coll} From ac931b650c2863788975a7812bbdf3f08e4e0bf2 Mon Sep 17 00:00:00 2001 From: Reese Date: Sun, 7 Dec 2025 17:20:08 +0000 Subject: [PATCH 15/18] Aligns related paths with PATH_EMIT_MODE Ensures related paths are emitted in the appropriate path space (host or container) based on the PATH_EMIT_MODE environment variable. This change introduces a mapping of container paths to host paths to ensure consistent path representation for human-facing interfaces, while preserving container paths for backend usage. --- scripts/hybrid_search.py | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/scripts/hybrid_search.py b/scripts/hybrid_search.py index c16ed394..8c35dc3f 100644 --- a/scripts/hybrid_search.py +++ b/scripts/hybrid_search.py @@ -2661,6 +2661,20 @@ def _pass_filters(m: Dict[str, Any]) -> bool: except Exception: all_paths = set() + # Build path -> host_path map so we can emit related_paths in host space + # when PATH_EMIT_MODE prefers host paths. This keeps human-facing paths + # consistent while still preserving container paths for backend use. + host_map: Dict[str, str] = {} + try: + for _m in merged: + _md = (_m["pt"].payload or {}).get("metadata") or {} + _p = str(_md.get("path") or "").strip() + _h = str(_md.get("host_path") or "").strip() + if _p and _h: + host_map[_p] = _h + except Exception: + host_map = {} + items: List[Dict[str, Any]] = [] if not merged: if _USE_CACHE and cache_key is not None: @@ -2788,6 +2802,22 @@ def _resolve(seg: str) -> list[str]: pass _related = sorted(_related_set)[:10] + # Align related_paths with PATH_EMIT_MODE when possible: in host/auto + # modes, prefer host paths when we have a mapping; in container mode, + # keep container/path-space values as-is. + _related_out = _related + try: + _mode_related = str(os.environ.get("PATH_EMIT_MODE", "auto")).strip().lower() + except Exception: + _mode_related = "auto" + if _mode_related in {"host", "auto"}: + try: + _mapped: List[str] = [] + for rp in _related: + _mapped.append(host_map.get(rp, rp)) + _related_out = _mapped + except Exception: + _related_out = _related # Best-effort snippet text directly from payload for downstream LLM stitching _payload = (m["pt"].payload or {}) if m.get("pt") is not None else {} _metadata = _payload.get("metadata", {}) or {} @@ -2865,7 +2895,7 @@ def _resolve(seg: str) -> list[str]: "components": comp, "why": why, "relations": {"imports": _imports, "calls": _calls, "symbol_path": _symp}, - "related_paths": _related, + "related_paths": _related_out, "span_budgeted": bool(m.get("_merged_start") is not None), "budget_tokens_used": m.get("_budget_tokens"), "text": _text, From 6bf37c7496e172dc30acb453926db7b2bc8765fc Mon Sep 17 00:00:00 2001 From: Reese Date: Sun, 7 Dec 2025 17:58:16 +0000 Subject: [PATCH 16/18] (commit search):Adds optional vector-augmented commit search Implements an optional feature to enhance commit search using vector embeddings. This feature allows for a semantic score to be computed for the query by blending it with the lexical/lineage score which is gated by a configuration setting. This commit also includes lazy loading of the fastembed library and a sanitize helper to allow environments without these dependencies to still function with a pure lexical search. --- .env | 1 + .env.example | 1 + scripts/mcp_indexer_server.py | 98 +++++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+) diff --git a/.env b/.env index 68efb682..9695c8a8 100644 --- a/.env +++ b/.env @@ -196,3 +196,4 @@ INFO_REQUEST_LIMIT=10 INFO_REQUEST_CONTEXT_LINES=5 # INFO_REQUEST_EXPLAIN_DEFAULT=0 # INFO_REQUEST_RELATIONSHIPS=0 +COMMIT_VECTOR_SEARCH=0 diff --git a/.env.example b/.env.example index 5a5b01b1..a53d3208 100644 --- a/.env.example +++ b/.env.example @@ -224,6 +224,7 @@ SMART_SYMBOL_REINDEXING=0 # REMOTE_UPLOAD_GIT_SINCE= # Enable commit lineage goals for indexing REFRAG_COMMIT_DESCRIBE=1 +COMMIT_VECTOR_SEARCH=0 STRICT_MEMORY_RESTORE=0 diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 0d628353..86deace3 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -3173,6 +3173,88 @@ async def search_commits_for( ] ) + # Optional vector-augmented scoring: use commit embeddings produced by + # ingest_history.py to compute a semantic score for the query and blend + # it with the lexical/lineage score. Gated by COMMIT_VECTOR_SEARCH and + # only active when a textual query is present. + # + # Empirically, the lineage-aware lexical scoring (goal/tags/symbols + + # message/information) already performs very well for behavior-phrase + # commit search across this repository. We have not yet found a clear + # "sweet spot" where enabling COMMIT_VECTOR_SEARCH by default yields a + # consistently better ranking, especially given the extra latency and + # dependency surface. Keep this as an opt-in experimental boost so + # callers can toggle it via env without relying on vector similarity + # for core behavior. + vector_scores: Dict[str, float] = {} + use_vectors = use_scoring and str( + os.environ.get("COMMIT_VECTOR_SEARCH", "0") or "1" + ).strip().lower() in {"1", "true", "yes", "on"} + if use_vectors: + try: + # Import lazily so environments without fastembed/sanitize helper + # still work with pure lexical search. + try: + from scripts.utils import ( # type: ignore + sanitize_vector_name as _sanitize_vector_name, + ) + except Exception: + _sanitize_vector_name = None # type: ignore + + from fastembed import TextEmbedding # type: ignore + + model_name = os.environ.get( + "MODEL_NAME", "BAAI/bge-base-en-v1.5" + ) + vec_name: Optional[str] + if _sanitize_vector_name is not None: + try: + vec_name = _sanitize_vector_name(model_name) + except Exception: + vec_name = None + else: + vec_name = None + + if vec_name: + # Embed the behavior phrase query once and run a vector + # search over the commit collection. We keep the result set + # modest and later blend its scores into the lexical + # scoring for matching commits. + embed_model = TextEmbedding(model_name=model_name) + qtext = " ".join(q_terms) if q_terms else "" + if qtext.strip(): + qvec = next(embed_model.embed([qtext])).tolist() + + def _vec_search(): + return client.search( + collection_name=coll, + query_vector={vec_name: qvec}, + query_filter=filt, + limit=min(mcap, 128), + with_payload=True, + with_vectors=False, + ) + + v_hits = await asyncio.to_thread(_vec_search) + for sp in v_hits or []: + payload_v = getattr(sp, "payload", {}) or {} + md_v = payload_v.get("metadata") or {} + cid_v = md_v.get("commit_id") or md_v.get("symbol") + scid_v = str(cid_v) if cid_v is not None else "" + if not scid_v: + continue + try: + vs = float(getattr(sp, "score", 0.0) or 0.0) + except Exception: + vs = 0.0 + if vs <= 0.0: + continue + # Keep the best vector score per commit id + if scid_v not in vector_scores or vs > vector_scores[scid_v]: + vector_scores[scid_v] = vs + except Exception: + vector_scores = {} + page = None scanned = 0 out: list[dict[str, Any]] = [] @@ -3268,6 +3350,22 @@ async def search_commits_for( scid = str(cid) if cid is not None else "" if not scid or scid in seen_ids: continue + # Blend in vector similarity score when available. Qdrant's + # search returns higher scores for closer vectors (e.g., cosine + # similarity), so we can treat it as a positive boost on top of + # the lexical/lineage score. + try: + if use_scoring and vector_scores and scid in vector_scores: + vec_score = float(vector_scores.get(scid, 0.0) or 0.0) + if vec_score > 0.0: + weight = float( + os.environ.get("COMMIT_VECTOR_WEIGHT", "2.0") or 2.0 + ) + score += weight * vec_score + except Exception: + # On any unexpected issue with vector blending, fall back to + # the lexical score we already computed. + pass seen_ids.add(scid) out.append( { From 6cac485bf96c02b31ef6893d31f0821188d8fc7c Mon Sep 17 00:00:00 2001 From: Reese Date: Sun, 7 Dec 2025 18:30:04 +0000 Subject: [PATCH 17/18] Ref: 58be093cf89161270bbbfc65caf62553085966d1 Passes through index-time metadata Passes through index-time pseudo/tags metadata to downstream consumers. This allows MCP clients, rerankers, and IDEs to optionally incorporate GLM/LLM labels into their scoring or display logic. --- scripts/mcp_indexer_server.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 86deace3..8f3d879d 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -2540,6 +2540,13 @@ def _doc_for(obj: dict) -> str: item["span_budgeted"] = bool(obj.get("span_budgeted")) if obj.get("budget_tokens_used") is not None: item["budget_tokens_used"] = int(obj.get("budget_tokens_used")) + # Pass-through index-time pseudo/tags metadata so downstream consumers + # (e.g., MCP clients, rerankers, IDEs) can optionally incorporate + # GLM/LLM labels into their own scoring or display logic. + if obj.get("pseudo") is not None: + item["pseudo"] = obj.get("pseudo") + if obj.get("tags") is not None: + item["tags"] = obj.get("tags") results.append(item) # Mode-aware reordering: nudge core implementation code vs docs and non-core when requested From 7c1097b81864bd8911d4b58b045dd08dd8249f59 Mon Sep 17 00:00:00 2001 From: Reese Date: Sun, 7 Dec 2025 18:31:09 +0000 Subject: [PATCH 18/18] docs(claude example): Clarifies Qdrant-Indexer usage rules. Emphasizes the mandatory nature of Qdrant-Indexer tool usage. Reinforces the importance of semantic search with short natural-language queries, discouraging grep/regex syntax. --- docs/CLAUDE.example.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/CLAUDE.example.md b/docs/CLAUDE.example.md index baa1f3c1..950249dc 100644 --- a/docs/CLAUDE.example.md +++ b/docs/CLAUDE.example.md @@ -1,5 +1,6 @@ This file is intended for AI agents (Claude, etc.) using the Context‑Engine Qdrant‑Indexer and Memory MCP tools. +These rules are NOT optional - you may existing tools for file reading, "grep" like searching tools and 'codebase search' tools - unrelated to qdrant-indexer mcp tools - you are to favour the qdrant-indexer tooling at all costs, as per these guidelines Agentic AI Project Rules: When to Use MCP Qdrant-Indexer vs Grep @@ -37,7 +38,9 @@ Agentic AI Project Rules: When to Use MCP Qdrant-Indexer vs Grep MCP Tool Patterns: - # DO - Use concept/keyword-style queries (short natural-language fragments) + # DO - Use concept/keyword-style queries (short natural-language fragments). + # repo_search is semantic search, not grep, regex, or boolean syntax. + # Write queries as short descriptions, not as "foo OR bar" expressions. "input validation mechanisms" "database connection handling" "performance bottlenecks in request path"