diff --git a/.env b/.env index 68efb682..9695c8a8 100644 --- a/.env +++ b/.env @@ -196,3 +196,4 @@ INFO_REQUEST_LIMIT=10 INFO_REQUEST_CONTEXT_LINES=5 # INFO_REQUEST_EXPLAIN_DEFAULT=0 # INFO_REQUEST_RELATIONSHIPS=0 +COMMIT_VECTOR_SEARCH=0 diff --git a/.env.example b/.env.example index 117ce7e7..a53d3208 100644 --- a/.env.example +++ b/.env.example @@ -49,6 +49,7 @@ FASTMCP_HTTP_PORT=8002 FASTMCP_HTTP_HEALTH_PORT=18002 FASTMCP_INDEXER_HTTP_PORT=8003 FASTMCP_INDEXER_HTTP_HEALTH_PORT=18003 +# MCP_INDEXER_URL=http://localhost:8003/mcp # Optional: local cross-encoder reranker (ONNX) @@ -161,6 +162,7 @@ REFRAG_DECODER_MODE=prompt # prompt|soft # GLM_API_BASE=https://api.z.ai/api/coding/paas/v4/ # GLM_MODEL=glm-4.6 +# GLM_API_KEY=your_glm_api_key_here # GPU Performance Toggle # Set to 1 to use native GPU-accelerated server on localhost:8081 @@ -201,6 +203,19 @@ SMART_SYMBOL_REINDEXING=0 # INDEX_UPSERT_BACKOFF=0.5 # Debounce file events to coalesce bursts # WATCH_DEBOUNCE_SECS=1.5 +# Optional fs metadata fast-path for unchanged files (skips re-reading files +# when size/mtime match cache.json in the same workspace). +# INDEX_FS_FASTPATH=0 +# Optional 2-phase pseudo/tag mode (disabled by default). +# When enabled, indexer/watcher write base-only vectors and a background +# backfill worker adds pseudo/tags via Qdrant. +# PSEUDO_BACKFILL_ENABLED=1 +# PSEUDO_BACKFILL_DEBUG=0 +# PSEUDO_BACKFILL_TICK_SECS=60 +# PSEUDO_BACKFILL_MAX_POINTS=256 + +# Development Remote Upload Configuration +# HOST_INDEX_PATH=./dev-workspace # Remote upload git history (used by upload clients) # Max number of commits to include per bundle (0 disables git history) @@ -209,6 +224,7 @@ SMART_SYMBOL_REINDEXING=0 # REMOTE_UPLOAD_GIT_SINCE= # Enable commit lineage goals for indexing REFRAG_COMMIT_DESCRIBE=1 +COMMIT_VECTOR_SEARCH=0 STRICT_MEMORY_RESTORE=0 diff --git a/deploy/kubernetes/indexer-services.yaml b/deploy/kubernetes/indexer-services.yaml index 271e3578..723ebd6f 100644 --- a/deploy/kubernetes/indexer-services.yaml +++ b/deploy/kubernetes/indexer-services.yaml @@ -23,6 +23,20 @@ spec: runAsUser: 1000 runAsGroup: 1000 fsGroup: 1000 + initContainers: + - name: wait-for-qdrant + image: context-engine-indexer-service + imagePullPolicy: IfNotPresent + command: + - /bin/sh + - -c + - /app/scripts/wait-for-qdrant.sh + env: + - name: QDRANT_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: QDRANT_URL containers: - name: watcher image: context-engine-indexer-service @@ -130,6 +144,20 @@ spec: runAsUser: 1000 runAsGroup: 1000 fsGroup: 1000 + initContainers: + - name: wait-for-qdrant + image: context-engine-indexer-service + imagePullPolicy: IfNotPresent + command: + - /bin/sh + - -c + - /app/scripts/wait-for-qdrant.sh + env: + - name: QDRANT_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: QDRANT_URL restartPolicy: OnFailure containers: - name: indexer @@ -207,6 +235,20 @@ spec: runAsUser: 1000 runAsGroup: 1000 fsGroup: 1000 + initContainers: + - name: wait-for-qdrant + image: context-engine-indexer-service + imagePullPolicy: IfNotPresent + command: + - /bin/sh + - -c + - /app/scripts/wait-for-qdrant.sh + env: + - name: QDRANT_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: QDRANT_URL restartPolicy: OnFailure containers: - name: init-payload diff --git a/deploy/kubernetes/qdrant.yaml b/deploy/kubernetes/qdrant.yaml index 7ec599f7..ba645364 100644 --- a/deploy/kubernetes/qdrant.yaml +++ b/deploy/kubernetes/qdrant.yaml @@ -24,7 +24,7 @@ spec: containers: - name: qdrant image: qdrant/qdrant:latest - imagePullPolicy: IfNotPresent + imagePullPolicy: Always ports: - name: http containerPort: 6333 diff --git a/docs/CLAUDE.example.md b/docs/CLAUDE.example.md index baa1f3c1..950249dc 100644 --- a/docs/CLAUDE.example.md +++ b/docs/CLAUDE.example.md @@ -1,5 +1,6 @@ This file is intended for AI agents (Claude, etc.) using the Context‑Engine Qdrant‑Indexer and Memory MCP tools. +These rules are NOT optional - you may existing tools for file reading, "grep" like searching tools and 'codebase search' tools - unrelated to qdrant-indexer mcp tools - you are to favour the qdrant-indexer tooling at all costs, as per these guidelines Agentic AI Project Rules: When to Use MCP Qdrant-Indexer vs Grep @@ -37,7 +38,9 @@ Agentic AI Project Rules: When to Use MCP Qdrant-Indexer vs Grep MCP Tool Patterns: - # DO - Use concept/keyword-style queries (short natural-language fragments) + # DO - Use concept/keyword-style queries (short natural-language fragments). + # repo_search is semantic search, not grep, regex, or boolean syntax. + # Write queries as short descriptions, not as "foo OR bar" expressions. "input validation mechanisms" "database connection handling" "performance bottlenecks in request path" diff --git a/scripts/hybrid_search.py b/scripts/hybrid_search.py index 4b3c0495..8c35dc3f 100644 --- a/scripts/hybrid_search.py +++ b/scripts/hybrid_search.py @@ -1454,7 +1454,7 @@ def dense_query( # In-process API: run hybrid search and return structured items list # Optional: pass an existing TextEmbedding instance via model to reuse cache - +# Optional: pass mode to adjust implementation/docs weighting (code_first/balanced/docs_first) def run_hybrid_search( queries: List[str], @@ -1473,6 +1473,7 @@ def run_hybrid_search( expand: bool = True, model: TextEmbedding | None = None, collection: str | None = None, + mode: str | None = None, repo: str | list[str] | None = None, # Filter by repo name(s); "*" to disable auto-filter ) -> List[Dict[str, Any]]: client = QdrantClient(url=os.environ.get("QDRANT_URL", QDRANT_URL), api_key=API_KEY) @@ -1594,6 +1595,7 @@ def _norm_under(u: str | None) -> str | None: str(_collection()), _env_truthy(os.environ.get("HYBRID_ADAPTIVE_WEIGHTS"), True), _env_truthy(os.environ.get("HYBRID_MMR"), True), + str(mode or ""), ) except Exception: cache_key = None @@ -1690,6 +1692,7 @@ def _norm_under(u: str | None) -> str | None: 'expand': expand, 'collection': _collection(), 'vector_name': vec_name, + 'mode': mode, } is_duplicate, similar_fp = is_duplicate_request(request_data) if is_duplicate: @@ -1982,7 +1985,15 @@ def _scaled_rrf(rank: int) -> float: flt_gated = _sanitize_filter_obj(flt_gated) result_sets: List[List[Any]] = [ - dense_query(client, vec_name, v, flt_gated, _scaled_per_query, collection, query_text=queries[i] if i < len(queries) else None) + dense_query( + client, + vec_name, + v, + flt_gated, + _scaled_per_query, + collection, + query_text=queries[i] if i < len(queries) else None, + ) for i, v in enumerate(embedded) ] if os.environ.get("DEBUG_HYBRID_SEARCH"): @@ -2239,6 +2250,18 @@ def _scaled_rrf(rank: int) -> float: # Lexical + boosts timestamps: List[int] = [] + # Mode-aware tweaks for implementation/docs weighting. Modes: + # - None / "code_first": full IMPLEMENTATION_BOOST and DOCUMENTATION_PENALTY + # - "balanced": keep impl boost, halve doc penalty + # - "docs_first": reduce impl boost slightly and disable doc penalty + eff_mode = (mode or "").strip().lower() + impl_boost = IMPLEMENTATION_BOOST + doc_penalty = DOCUMENTATION_PENALTY + if eff_mode in {"balanced"}: + doc_penalty = DOCUMENTATION_PENALTY * 0.5 + elif eff_mode in {"docs_first", "docs-first", "docs"}: + impl_boost = IMPLEMENTATION_BOOST * 0.5 + doc_penalty = 0.0 for pid, rec in list(score_map.items()): payload = rec["pt"].payload or {} base_md = payload.get("metadata") or {} @@ -2288,22 +2311,25 @@ def _scaled_rrf(rank: int) -> float: if ext in {".json", ".yml", ".yaml", ".toml", ".ini"} or "/.codebase/" in path_lower or "/.kiro/" in path_lower: rec["cfg"] = float(rec.get("cfg", 0.0)) - CONFIG_FILE_PENALTY rec["s"] -= CONFIG_FILE_PENALTY - # Boost likely implementation files - if IMPLEMENTATION_BOOST > 0.0 and path: + # Boost likely implementation files (mode-aware) + if impl_boost > 0.0 and path: if ext in {".py", ".ts", ".tsx", ".js", ".jsx", ".go", ".java", ".rs", ".rb", ".php", ".cs", ".cpp", ".c", ".hpp", ".h"}: - rec["impl"] = float(rec.get("impl", 0.0)) + IMPLEMENTATION_BOOST - rec["s"] += IMPLEMENTATION_BOOST - # Penalize docs for implementation-style questions - qlow = " ".join(qlist).lower() - if DOCUMENTATION_PENALTY > 0.0 and path: - if ("readme" in path_lower or "/docs/" in path_lower or "/documentation/" in path_lower or path_lower.endswith(".md")): - if any(w in qlow for w in ["how does", "explain", "works", "algorithm"]): - rec["doc"] = float(rec.get("doc", 0.0)) - DOCUMENTATION_PENALTY - rec["s"] -= DOCUMENTATION_PENALTY + rec["impl"] = float(rec.get("impl", 0.0)) + impl_boost + rec["s"] += impl_boost + # Penalize docs (README/docs/markdown) relative to implementation files (mode-aware) + if doc_penalty > 0.0 and path: + if ( + "readme" in path_lower + or "/docs/" in path_lower + or "/documentation/" in path_lower + or path_lower.endswith(".md") + ): + rec["doc"] = float(rec.get("doc", 0.0)) - doc_penalty + rec["s"] -= doc_penalty if LANG_MATCH_BOOST > 0.0 and path and eff_language: lang = str(eff_language).lower() - md_lang = str((md.get("language") or "")).lower() + md_lang = str((md.get("language") or "").lower()) if (lang and md_lang and md_lang == lang) or lang_matches_path(lang, path): rec["langb"] += LANG_MATCH_BOOST rec["s"] += LANG_MATCH_BOOST @@ -2635,6 +2661,20 @@ def _pass_filters(m: Dict[str, Any]) -> bool: except Exception: all_paths = set() + # Build path -> host_path map so we can emit related_paths in host space + # when PATH_EMIT_MODE prefers host paths. This keeps human-facing paths + # consistent while still preserving container paths for backend use. + host_map: Dict[str, str] = {} + try: + for _m in merged: + _md = (_m["pt"].payload or {}).get("metadata") or {} + _p = str(_md.get("path") or "").strip() + _h = str(_md.get("host_path") or "").strip() + if _p and _h: + host_map[_p] = _h + except Exception: + host_map = {} + items: List[Dict[str, Any]] = [] if not merged: if _USE_CACHE and cache_key is not None: @@ -2762,6 +2802,22 @@ def _resolve(seg: str) -> list[str]: pass _related = sorted(_related_set)[:10] + # Align related_paths with PATH_EMIT_MODE when possible: in host/auto + # modes, prefer host paths when we have a mapping; in container mode, + # keep container/path-space values as-is. + _related_out = _related + try: + _mode_related = str(os.environ.get("PATH_EMIT_MODE", "auto")).strip().lower() + except Exception: + _mode_related = "auto" + if _mode_related in {"host", "auto"}: + try: + _mapped: List[str] = [] + for rp in _related: + _mapped.append(host_map.get(rp, rp)) + _related_out = _mapped + except Exception: + _related_out = _related # Best-effort snippet text directly from payload for downstream LLM stitching _payload = (m["pt"].payload or {}) if m.get("pt") is not None else {} _metadata = _payload.get("metadata", {}) or {} @@ -2772,6 +2828,14 @@ def _resolve(seg: str) -> list[str]: _metadata.get("text") or "" ) + # Carry through pseudo/tags so downstream consumers (e.g., repo_search reranker) + # can incorporate index-time GLM/llm labels into their own scoring or display. + _pseudo = _payload.get("pseudo") + if _pseudo is None: + _pseudo = _metadata.get("pseudo") + _tags = _payload.get("tags") + if _tags is None: + _tags = _metadata.get("tags") # Skip memory-like points without a real file path if not _path or not _path.strip(): if os.environ.get("DEBUG_HYBRID_FILTER"): @@ -2831,10 +2895,12 @@ def _resolve(seg: str) -> list[str]: "components": comp, "why": why, "relations": {"imports": _imports, "calls": _calls, "symbol_path": _symp}, - "related_paths": _related, + "related_paths": _related_out, "span_budgeted": bool(m.get("_merged_start") is not None), "budget_tokens_used": m.get("_budget_tokens"), "text": _text, + "pseudo": _pseudo, + "tags": _tags, } ) if _USE_CACHE and cache_key is not None: @@ -3111,7 +3177,15 @@ def _cli_scaled_rrf(rank: int) -> float: embedded = _embed_queries_cached(model, queries) result_sets: List[List[Any]] = [ - dense_query(client, vec_name, v, flt, _cli_scaled_per_query, eff_collection, query_text=queries[i] if i < len(queries) else None) + dense_query( + client, + vec_name, + v, + flt, + _cli_scaled_per_query, + eff_collection, + query_text=queries[i] if i < len(queries) else None, + ) for i, v in enumerate(embedded) ] diff --git a/scripts/ingest_code.py b/scripts/ingest_code.py index b3bbbd01..3a5174ec 100644 --- a/scripts/ingest_code.py +++ b/scripts/ingest_code.py @@ -78,6 +78,7 @@ def logical_repo_reuse_enabled() -> bool: # type: ignore[no-redef] set_cached_pseudo, update_symbols_with_pseudo, get_workspace_state, + get_cached_file_meta, ) except ImportError: # State integration is optional; continue if not available @@ -95,6 +96,7 @@ def logical_repo_reuse_enabled() -> bool: # type: ignore[no-redef] update_symbols_with_pseudo = None # type: ignore compare_symbol_changes = None # type: ignore get_workspace_state = None # type: ignore + get_cached_file_meta = None # type: ignore # Optional Tree-sitter import (graceful fallback) try: @@ -1158,6 +1160,185 @@ def ensure_payload_indexes(client: QdrantClient, collection: str): ENSURED_COLLECTIONS: set[str] = set() +def pseudo_backfill_tick( + client: QdrantClient, + collection: str, + repo_name: str | None = None, + *, + max_points: int = 256, +) -> int: + """Best-effort pseudo/tag backfill for a collection. + + Scans up to max_points points for a given repo (when provided) that have not yet + been marked as pseudo-enriched and updates them in-place with pseudo/tags and + refreshed lexical vectors. Does not touch cache.json or hash-based skip logic; + operates purely on Qdrant payloads/vectors. + """ + + if not collection or max_points <= 0: + return 0 + + try: + from qdrant_client import models as _models + except Exception: + return 0 + + must_conditions: list[Any] = [] + if repo_name: + try: + must_conditions.append( + _models.FieldCondition( + key="metadata.repo", + match=_models.MatchValue(value=repo_name), + ) + ) + except Exception: + pass + + flt = None + try: + # Prefer server-side filtering for points missing pseudo/tags when supported + null_cond = getattr(_models, "IsNullCondition", None) + empty_cond = getattr(_models, "IsEmptyCondition", None) + if null_cond is not None: + should_conditions = [] + try: + should_conditions.append(null_cond(is_null="pseudo")) + except Exception: + pass + try: + should_conditions.append(null_cond(is_null="tags")) + except Exception: + pass + if empty_cond is not None: + try: + should_conditions.append(empty_cond(is_empty="tags")) + except Exception: + pass + flt = _models.Filter( + must=must_conditions or None, + should=should_conditions or None, + ) + else: + # Fallback: only scope by repo, rely on Python-side pseudo/tags checks + flt = _models.Filter(must=must_conditions or None) + except Exception: + flt = None + + processed = 0 + debug_enabled = (os.environ.get("PSEUDO_BACKFILL_DEBUG") or "").strip().lower() in { + "1", + "true", + "yes", + "on", + } + debug_stats = { + "scanned": 0, + "glm_calls": 0, + "glm_success": 0, + "filled_new": 0, + "updated_existing": 0, + "skipped_no_code": 0, + "skipped_after_glm": 0, + } + next_offset = None + + while processed < max_points: + batch_limit = max(1, min(64, max_points - processed)) + try: + points, next_offset = client.scroll( + collection_name=collection, + scroll_filter=flt, + limit=batch_limit, + with_payload=True, + with_vectors=True, + offset=next_offset, + ) + except Exception: + break + + if not points: + break + + new_points: list[Any] = [] + for rec in points: + try: + if debug_enabled: + debug_stats["scanned"] += 1 + payload = rec.payload or {} + md = payload.get("metadata") or {} + code = md.get("code") or "" + if not code: + if debug_enabled: + debug_stats["skipped_no_code"] += 1 + continue + + pseudo = payload.get("pseudo") or "" + tags_val = payload.get("tags") or [] + tags: list[str] = list(tags_val) if isinstance(tags_val, list) else [] + had_existing = bool(pseudo or tags) + + # If pseudo/tags are missing, generate them once + if not pseudo and not tags: + try: + if debug_enabled: + debug_stats["glm_calls"] += 1 + pseudo, tags = generate_pseudo_tags(code) + if debug_enabled and (pseudo or tags): + debug_stats["glm_success"] += 1 + except Exception: + pseudo, tags = "", [] + + if not pseudo and not tags: + if debug_enabled: + debug_stats["skipped_after_glm"] += 1 + continue + + # Update payload and lexical vector with pseudo/tags + payload["pseudo"] = pseudo + payload["tags"] = tags + if debug_enabled: + if had_existing: + debug_stats["updated_existing"] += 1 + else: + debug_stats["filled_new"] += 1 + + aug_text = f"{code} {pseudo} {' '.join(tags)}".strip() + lex_vec = _lex_hash_vector_text(aug_text) + + vec = rec.vector + if isinstance(vec, dict): + vecs = dict(vec) + vecs[LEX_VECTOR_NAME] = lex_vec + new_vec = vecs + else: + # Fallback: collections without named vectors - leave dense vector as-is + new_vec = vec + + new_points.append( + models.PointStruct( + id=rec.id, + vector=new_vec, + payload=payload, + ) + ) + processed += 1 + except Exception: + continue + + if new_points: + try: + upsert_points(client, collection, new_points) + except Exception: + # Best-effort: on failure, stop this tick + break + + if next_offset is None: + break + + return processed + + def ensure_collection_and_indexes_once( client: QdrantClient, collection: str, @@ -2055,14 +2236,19 @@ def index_single_file( *, dedupe: bool = True, skip_unchanged: bool = True, + pseudo_mode: str = "full", trust_cache: bool | None = None, ) -> bool: """Index a single file path. Returns True if indexed, False if skipped. When trust_cache is enabled (via argument or INDEX_TRUST_CACHE=1), rely solely on the local .codebase/cache.json for unchanged detection and skip Qdrant per-file hash checks. + This is a debug-only escape hatch and is unsafe for normal operation: enabling it may + hide index/cache drift, especially with git worktree reuse or collection rebuilds. """ - # Resolve trust_cache from env when not explicitly provided + + # Resolve trust_cache from env when not explicitly provided. INDEX_TRUST_CACHE is intended + # for debugging only and should not be enabled in normal indexing runs. if trust_cache is None: try: trust_cache = os.environ.get("INDEX_TRUST_CACHE", "").strip().lower() in { @@ -2074,6 +2260,23 @@ def index_single_file( except Exception: trust_cache = False + fast_fs = _env_truthy(os.environ.get("INDEX_FS_FASTPATH"), False) + if skip_unchanged and fast_fs and get_cached_file_meta is not None: + try: + repo_name_for_cache = _detect_repo_name_from_path(file_path) + meta = get_cached_file_meta(str(file_path), repo_name_for_cache) or {} + size = meta.get("size") + mtime = meta.get("mtime") + if size is not None and mtime is not None: + st = file_path.stat() + if int(getattr(st, "st_size", 0)) == int(size) and int( + getattr(st, "st_mtime", 0) + ) == int(mtime): + print(f"Skipping unchanged file (fs-meta): {file_path}") + return False + except Exception: + pass + try: text = file_path.read_text(encoding="utf-8", errors="ignore") except Exception as e: @@ -2136,6 +2339,12 @@ def index_single_file( if get_cached_file_hash: prev_local = get_cached_file_hash(str(file_path), repo_tag) if prev_local and file_hash and prev_local == file_hash: + # When fs fast-path is enabled, refresh cache entry with size/mtime + if fast_fs and set_cached_file_hash: + try: + set_cached_file_hash(str(file_path), file_hash, repo_tag) + except Exception: + pass print(f"Skipping unchanged file (cache): {file_path}") return False except Exception: @@ -2151,6 +2360,12 @@ def index_single_file( repo_rel_path=repo_rel_path, ) if prev and prev == file_hash: + # When fs fast-path is enabled, refresh cache entry with size/mtime + if fast_fs and set_cached_file_hash: + try: + set_cached_file_hash(str(file_path), file_hash, repo_tag) + except Exception: + pass print(f"Skipping unchanged file: {file_path}") return False @@ -2338,26 +2553,29 @@ def make_point(pid, dense_vec, lex_vec, payload): } # Optional LLM enrichment for lexical retrieval: pseudo + tags per micro-chunk # Use symbol-aware gating and cached pseudo/tags where possible - needs_pseudo, cached_pseudo, cached_tags = should_process_pseudo_for_chunk( - str(file_path), ch, changed_symbols - ) - pseudo, tags = cached_pseudo, cached_tags - if needs_pseudo: - try: - pseudo, tags = generate_pseudo_tags(ch.get("text") or "") - if pseudo or tags: - # Cache the pseudo data for this symbol - symbol_name = ch.get("symbol", "") - if symbol_name: - kind = ch.get("kind", "unknown") - start_line = ch.get("start", 0) - symbol_id = f"{kind}_{symbol_name}_{start_line}" - - if set_cached_pseudo: - set_cached_pseudo(str(file_path), symbol_id, pseudo, tags, file_hash) - except Exception: - # Fall back to cached values (if any) or empty pseudo/tags - pass + pseudo = "" + tags = [] + if pseudo_mode != "off": + needs_pseudo, cached_pseudo, cached_tags = should_process_pseudo_for_chunk( + str(file_path), ch, changed_symbols + ) + pseudo, tags = cached_pseudo, cached_tags + if pseudo_mode == "full" and needs_pseudo: + try: + pseudo, tags = generate_pseudo_tags(ch.get("text") or "") + if pseudo or tags: + # Cache the pseudo data for this symbol + symbol_name = ch.get("symbol", "") + if symbol_name: + kind = ch.get("kind", "unknown") + start_line = ch.get("start", 0) + symbol_id = f"{kind}_{symbol_name}_{start_line}" + + if set_cached_pseudo: + set_cached_pseudo(str(file_path), symbol_id, pseudo, tags, file_hash) + except Exception: + # Fall back to cached values (if any) or empty pseudo/tags + pass # Attach whichever pseudo/tags we ended up with (cached or freshly generated) if pseudo: payload["pseudo"] = pseudo @@ -2404,7 +2622,36 @@ def index_repo( *, dedupe: bool = True, skip_unchanged: bool = True, + pseudo_mode: str = "full", ): + # Optional fast no-change precheck: when INDEX_FS_FASTPATH is enabled, use + # fs metadata + cache.json to exit early before model/Qdrant setup when all + # files are unchanged. + fast_fs = _env_truthy(os.environ.get("INDEX_FS_FASTPATH"), False) + if skip_unchanged and not recreate and fast_fs and get_cached_file_meta is not None: + try: + all_unchanged = True + for file_path in iter_files(root): + per_file_repo_for_cache = _detect_repo_name_from_path(file_path) if _detect_repo_name_from_path else None + meta = get_cached_file_meta(str(file_path), per_file_repo_for_cache) or {} + size = meta.get("size") + mtime = meta.get("mtime") + if size is None or mtime is None: + all_unchanged = False + break + st = file_path.stat() + if int(getattr(st, "st_size", 0)) != int(size) or int(getattr(st, "st_mtime", 0)) != int(mtime): + all_unchanged = False + break + if all_unchanged: + try: + print("[fast_index] No changes detected via fs metadata; skipping model and Qdrant setup") + except Exception: + pass + return + except Exception: + pass + model = TextEmbedding(model_name=model_name) # Determine embedding dimension dim = len(next(model.embed(["dimension probe"]))) @@ -2494,6 +2741,9 @@ def index_repo( ) # Health check: detect cache/collection sync issues before indexing (single-collection mode only) + # TODO: In future, consider a dedicated "health-check-only" mode/command that runs these + # expensive Qdrant probes without doing a full index pass, so that "nothing changed" runs + # can stay as cheap as possible while still offering an explicit way to validate collections. # Skip with SKIP_HEALTH_CHECK=1 for large collections where scroll is slow _skip_health = os.environ.get("SKIP_HEALTH_CHECK", "").strip().lower() in {"1", "true", "yes"} if not _skip_health and not recreate and skip_unchanged and not use_per_repo_collections and collection: @@ -2583,6 +2833,8 @@ def make_point(pid, dense_vec, lex_vec, payload): # Track per-file hashes across the entire run for cache updates on any flush batch_file_hashes = {} + fast_fs = _env_truthy(os.environ.get("INDEX_FS_FASTPATH"), False) + # Collect files for progress bar (fast: just list paths, no I/O) all_files = list(iter_files(root)) total_files = len(all_files) @@ -2592,8 +2844,13 @@ def make_point(pid, dense_vec, lex_vec, payload): # When progress bar is active, suppress per-file skip messages _use_progress_bar = tqdm is not None if _use_progress_bar: - file_iter = tqdm(all_files, desc="Indexing", unit="file", ncols=100, - bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]") + file_iter = tqdm( + all_files, + desc="Indexing", + unit="file", + ncols=100, + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]", + ) else: file_iter = all_files @@ -2610,6 +2867,23 @@ def make_point(pid, dense_vec, lex_vec, payload): else: current_collection = get_collection_name(ws_path) if get_collection_name else "default-collection" + # Optional fs-metadata fast-path: skip files whose size/mtime match cache + if skip_unchanged and fast_fs and get_cached_file_meta is not None: + try: + per_file_repo_for_cache = _detect_repo_name_from_path(file_path) + meta = get_cached_file_meta(str(file_path), per_file_repo_for_cache) or {} + size = meta.get("size") + mtime = meta.get("mtime") + if size is not None and mtime is not None: + st = file_path.stat() + if int(getattr(st, "st_size", 0)) == int(size) and int( + getattr(st, "st_mtime", 0) + ) == int(mtime): + print(f"Skipping unchanged file (fs-meta): {file_path}") + continue + except Exception: + pass + try: text = file_path.read_text(encoding="utf-8", errors="ignore") except Exception as e: @@ -2671,10 +2945,38 @@ def make_point(pid, dense_vec, lex_vec, payload): if get_cached_file_hash: prev_local = get_cached_file_hash(str(file_path), per_file_repo) if prev_local and file_hash and prev_local == file_hash: + # When fs fast-path is enabled, refresh cache entry with size/mtime + if fast_fs and set_cached_file_hash: + try: + set_cached_file_hash(str(file_path), file_hash, per_file_repo) + except Exception: + pass # Only print skip messages if no progress bar if not _use_progress_bar: if PROGRESS_EVERY <= 0 and files_seen % 50 == 0: print(f"... processed {files_seen} files (skipping unchanged, cache)") + try: + if update_indexing_status: + target_workspace = ( + ws_path if not use_per_repo_collections else str(file_path.parent) + ) + target_repo = ( + repo_tag if not use_per_repo_collections else per_file_repo + ) + update_indexing_status( + workspace_path=target_workspace, + status={ + "state": "indexing", + "progress": { + "files_processed": files_seen, + "total_files": None, + "current_file": str(file_path), + }, + }, + repo_name=target_repo, + ) + except Exception: + pass else: print(f"Skipping unchanged file (cache): {file_path}") continue @@ -2898,23 +3200,26 @@ def make_point(pid, dense_vec, lex_vec, payload): } # Optional LLM enrichment for lexical retrieval: pseudo + tags per micro-chunk # Use symbol-aware gating and cached pseudo/tags where possible - needs_pseudo, cached_pseudo, cached_tags = should_process_pseudo_for_chunk( - str(file_path), ch, changed_symbols - ) - pseudo, tags = cached_pseudo, cached_tags - if needs_pseudo: - try: - pseudo, tags = generate_pseudo_tags(ch.get("text") or "") - if pseudo or tags: - symbol_name = ch.get("symbol", "") - if symbol_name: - kind = ch.get("kind", "unknown") - start_line = ch.get("start", 0) - symbol_id = f"{kind}_{symbol_name}_{start_line}" - if set_cached_pseudo: - set_cached_pseudo(str(file_path), symbol_id, pseudo, tags, file_hash) - except Exception: - pass + pseudo = "" + tags: list[str] = [] + if pseudo_mode != "off": + needs_pseudo, cached_pseudo, cached_tags = should_process_pseudo_for_chunk( + str(file_path), ch, changed_symbols + ) + pseudo, tags = cached_pseudo, cached_tags + if pseudo_mode == "full" and needs_pseudo: + try: + pseudo, tags = generate_pseudo_tags(ch.get("text") or "") + if pseudo or tags: + symbol_name = ch.get("symbol", "") + if symbol_name: + kind = ch.get("kind", "unknown") + start_line = ch.get("start", 0) + symbol_id = f"{kind}_{symbol_name}_{start_line}" + if set_cached_pseudo: + set_cached_pseudo(str(file_path), symbol_id, pseudo, tags, file_hash) + except Exception: + pass if pseudo: payload["pseudo"] = pseudo if tags: @@ -3642,6 +3947,9 @@ def main(): collection = os.environ.get("COLLECTION_NAME", "codebase") print(f"[single_repo] Single-repo mode enabled - using collection: {collection}") + flag = (os.environ.get("PSEUDO_BACKFILL_ENABLED") or "").strip().lower() + pseudo_mode = "off" if flag in {"1", "true", "yes", "on"} else "full" + index_repo( Path(args.root).resolve(), qdrant_url, @@ -3651,6 +3959,9 @@ def main(): args.recreate, dedupe=(not args.no_dedupe), skip_unchanged=(not args.no_skip_unchanged), + # Pseudo/tags are inlined by default; when PSEUDO_BACKFILL_ENABLED=1 we run + # base-only and rely on the background backfill worker to add pseudo/tags. + pseudo_mode=pseudo_mode, ) diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index c912c721..8f3d879d 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -1729,6 +1729,7 @@ async def repo_search( highlight_snippet: Any = None, collection: Any = None, workspace_path: Any = None, + mode: Any = None, session: Any = None, @@ -1899,6 +1900,11 @@ async def repo_search( case = _extra.get("case") if compact in (None, "") and _extra.get("compact") is not None: compact = _extra.get("compact") + # Optional mode hint: "code_first", "docs_first", "balanced" + if ( + mode is None or (isinstance(mode, str) and str(mode).strip() == "") + ) and _extra.get("mode") is not None: + mode = _extra.get("mode") except Exception: pass @@ -1953,6 +1959,9 @@ def _to_str(x, default=""): ) highlight_snippet = _to_bool(highlight_snippet, True) + # Optional mode knob: "code_first" (default for IDE), "docs_first", "balanced" + mode_str = _to_str(mode, "").strip().lower() + # Resolve collection precedence: explicit > per-connection defaults > token defaults > env default coll_hint = _to_str(collection, "").strip() @@ -2096,12 +2105,25 @@ def _to_str_list(x): model = _get_embedding_model(model_name) # Ensure hybrid_search uses the intended collection when running in-process prev_coll = os.environ.get("COLLECTION_NAME") + # Determine effective hybrid candidate limit: if rerank is enabled, search up to rerank_top_n + try: + base_limit = int(limit) + except Exception: + base_limit = 10 + eff_limit = base_limit + if rerank_enabled: + try: + rt = int(rerank_top_n) + except Exception: + rt = 0 + if rt > eff_limit: + eff_limit = rt try: os.environ["COLLECTION_NAME"] = collection # In-process path_glob/not_glob accept a single string; reduce list inputs safely items = run_hybrid_search( queries=queries, - limit=int(limit), + limit=eff_limit, per_path=( int(per_path) if (per_path is not None and str(per_path).strip() != "") @@ -2120,6 +2142,7 @@ def _to_str_list(x): expand=str(os.environ.get("HYBRID_EXPAND", "1")).strip().lower() in {"1", "true", "yes", "on"}, model=model, + mode=mode_str or None, repo=repo_filter, # Cross-codebase isolation ) finally: @@ -2138,11 +2161,23 @@ def _to_str_list(x): if not use_hybrid_inproc: # Try hybrid search via subprocess (JSONL output) + try: + base_limit = int(limit) + except Exception: + base_limit = 10 + eff_limit = base_limit + if rerank_enabled: + try: + rt = int(rerank_top_n) + except Exception: + rt = 0 + if rt > eff_limit: + eff_limit = rt cmd = [ "python", _work_script("hybrid_search.py"), "--limit", - str(int(limit)), + str(eff_limit), "--json", ] if per_path is not None and str(per_path).strip() != "": @@ -2210,6 +2245,7 @@ def _to_str_list(x): expand=str(os.environ.get("HYBRID_EXPAND", "0")).strip().lower() in {"1", "true", "yes", "on"}, model=model, + mode=mode_str or None, repo=repo_filter, # Cross-codebase isolation ) json_lines = items @@ -2238,24 +2274,62 @@ def _to_str_list(x): import concurrent.futures as _fut rq = queries[0] if queries else "" - # Prepare candidate docs from top-N hybrid hits (path+symbol + small snippet) + # Prepare candidate docs from top-N hybrid hits (path+symbol + pseudo/tags + small snippet) cand_objs = list(json_lines[: int(rerank_top_n)]) def _doc_for(obj: dict) -> str: path = str(obj.get("path") or "") symbol = str(obj.get("symbol") or "") header = f"{symbol} — {path}".strip() + + # Try to enrich with pseudo/tags from underlying payload when available. + # We expect hybrid to have preserved metadata in obj["components"] or + # direct fields; if not, we fall back to header+code only. + meta_lines: list[str] = [header] if header else [] + try: + # Prefer explicit pseudo/tags fields on the top-level object when present + pseudo_val = obj.get("pseudo") + tags_val = obj.get("tags") + if pseudo_val is None or tags_val is None: + # Fallback: inspect a nested metadata view when present + md = obj.get("metadata") or {} + if pseudo_val is None: + pseudo_val = md.get("pseudo") + if tags_val is None: + tags_val = md.get("tags") + pseudo_s = str(pseudo_val).strip() if pseudo_val is not None else "" + if pseudo_s: + # Keep pseudo short to avoid bloating rerank input + meta_lines.append(f"Summary: {pseudo_s[:256]}") + if tags_val: + try: + if isinstance(tags_val, (list, tuple)): + tags_text = ", ".join( + str(x) for x in tags_val + )[:128] + if tags_text: + meta_lines.append(f"Tags: {tags_text}") + else: + tags_text = str(tags_val)[:128] + if tags_text: + meta_lines.append(f"Tags: {tags_text}") + except Exception: + pass + except Exception: + # If any of the above fails, we just keep header-only + pass + sl = int(obj.get("start_line") or 0) el = int(obj.get("end_line") or 0) if not path or not sl: - return header + return "\n".join(meta_lines) if meta_lines else header try: p = path if not os.path.isabs(p): p = os.path.join("/work", p) realp = os.path.realpath(p) if not (realp == "/work" or realp.startswith("/work/")): - return header + return "\n".join(meta_lines) if meta_lines else header with open( realp, "r", encoding="utf-8", errors="ignore" ) as f: @@ -2268,11 +2342,12 @@ def _doc_for(obj: dict) -> str: si = max(1, sl - ctx) ei = min(len(lines), max(sl, el) + ctx) snippet = "".join(lines[si - 1 : ei]).strip() - return ( - header + ("\n" + snippet if snippet else "") - ).strip() + if snippet: + meta = "\n".join(meta_lines) if meta_lines else header + return (meta + "\n\n" + snippet).strip() + return "\n".join(meta_lines) if meta_lines else header except Exception: - return header + return "\n".join(meta_lines) if meta_lines else header # Build docs concurrently max_workers = min(16, (os.cpu_count() or 4) * 4) @@ -2465,8 +2540,152 @@ def _doc_for(obj: dict) -> str: item["span_budgeted"] = bool(obj.get("span_budgeted")) if obj.get("budget_tokens_used") is not None: item["budget_tokens_used"] = int(obj.get("budget_tokens_used")) + # Pass-through index-time pseudo/tags metadata so downstream consumers + # (e.g., MCP clients, rerankers, IDEs) can optionally incorporate + # GLM/LLM labels into their own scoring or display logic. + if obj.get("pseudo") is not None: + item["pseudo"] = obj.get("pseudo") + if obj.get("tags") is not None: + item["tags"] = obj.get("tags") results.append(item) + # Mode-aware reordering: nudge core implementation code vs docs and non-core when requested + def _is_doc_path(p: str) -> bool: + pl = str(p or "").lower() + return ( + "readme" in pl + or "/docs/" in pl + or "/documentation/" in pl + or pl.endswith(".md") + or pl.endswith(".rst") + or pl.endswith(".txt") + ) + + def _is_core_code_item(item: dict) -> bool: + """Classify a result as core implementation code for mode-aware reordering. + + This intentionally reuses hybrid_search's notion of core/test/vendor files + instead of duplicating extension and path heuristics here. We only apply + lightweight checks on top (docs/config/tests components) and delegate the + rest to helpers from hybrid_search when available. + """ + try: + raw_path = item.get("path") or "" + p = str(raw_path) + except Exception: + return False + if not p: + return False + # Never treat docs as core code + if _is_doc_path(p): + return False + + # Prefer items that were not explicitly tagged as docs/config/tests in hybrid components + comps = item.get("components") or {} + try: + if comps: + if comps.get("config_penalty") or comps.get("test_penalty") or comps.get("doc_penalty"): + return False + except Exception: + pass + + # Defer to hybrid_search helpers when available to avoid duplicating + # extension and path-based logic. + try: + from scripts.hybrid_search import ( # type: ignore + is_core_file as _hy_core_file, + is_test_file as _hy_is_test_file, + is_vendor_path as _hy_is_vendor_path, + ) + except Exception: + _hy_core_file = None + _hy_is_test_file = None + _hy_is_vendor_path = None + + if _hy_core_file: + try: + if not _hy_core_file(p): + return False + except Exception: + return False + if _hy_is_test_file: + try: + if _hy_is_test_file(p): + return False + except Exception: + pass + if _hy_is_vendor_path: + try: + if _hy_is_vendor_path(p): + return False + except Exception: + pass + + # If helper imports failed, fall back to a permissive classification: + # treat the item as core code (we already filtered obvious docs/config/tests). + return True + + if mode_str in {"code_first", "code-first", "code"}: + core_items: list[dict] = [] + other_code: list[dict] = [] + doc_items: list[dict] = [] + for it in results: + p = it.get("path") or "" + if p and _is_doc_path(p): + doc_items.append(it) + elif _is_core_code_item(it): + core_items.append(it) + else: + other_code.append(it) + results = core_items + other_code + doc_items + + try: + _min_core = int(os.environ.get("REPO_SEARCH_CODE_FIRST_MIN_CORE", "2") or 0) + except Exception: + _min_core = 2 + try: + _top_k = int(os.environ.get("REPO_SEARCH_CODE_FIRST_TOP_K", "8") or 8) + except Exception: + _top_k = 8 + if _min_core > 0 and results: + top_k = max(0, min(_top_k, len(results))) + if top_k > 0: + flags = [_is_core_code_item(it) for it in results] + cur_core = sum(1 for i in range(top_k) if flags[i]) + if cur_core < _min_core: + for src in range(top_k, len(results)): + if not flags[src]: + continue + for dst in range(top_k - 1, -1, -1): + if not flags[dst]: + results[dst], results[src] = results[src], results[dst] + flags[dst], flags[src] = flags[src], flags[dst] + cur_core += 1 + break + if cur_core >= _min_core: + break + elif mode_str in {"docs_first", "docs-first", "docs"}: + core_items = [] + other_code = [] + doc_items = [] + for it in results: + p = it.get("path") or "" + if p and _is_doc_path(p): + doc_items.append(it) + elif _is_core_code_item(it): + core_items.append(it) + else: + other_code.append(it) + results = doc_items + core_items + other_code + + # Enforce user-requested limit on final result count + try: + _limit_n = int(limit) + except Exception: + _limit_n = 0 + if _limit_n > 0 and len(results) > _limit_n: + results = results[:_limit_n] + # Optionally add snippets (with highlighting) toks = _tokens_from_queries(queries) if include_snippet: @@ -2642,6 +2861,7 @@ async def repo_search_compat(**arguments) -> Dict[str, Any]: "not_": not_value, "case": args.get("case"), "compact": args.get("compact"), + "mode": args.get("mode"), "repo": args.get("repo"), # Cross-codebase isolation # Alias passthroughs captured by repo_search(**kwargs) "queries": queries, @@ -2932,6 +3152,11 @@ async def search_commits_for( mcap = int(max_points) if max_points not in (None, "") else 1000 except (ValueError, TypeError): mcap = 1000 + # When a textual query is present, enable scoring and allow scanning up to + # max_points commits so we can rank the best matches. Without a query, + # preserve the previous behavior (early exit after "limit" unique commits). + use_scoring = bool(q_terms) + max_ids_for_scan = mcap if use_scoring else lim try: from qdrant_client import QdrantClient # type: ignore @@ -2955,11 +3180,93 @@ async def search_commits_for( ] ) + # Optional vector-augmented scoring: use commit embeddings produced by + # ingest_history.py to compute a semantic score for the query and blend + # it with the lexical/lineage score. Gated by COMMIT_VECTOR_SEARCH and + # only active when a textual query is present. + # + # Empirically, the lineage-aware lexical scoring (goal/tags/symbols + + # message/information) already performs very well for behavior-phrase + # commit search across this repository. We have not yet found a clear + # "sweet spot" where enabling COMMIT_VECTOR_SEARCH by default yields a + # consistently better ranking, especially given the extra latency and + # dependency surface. Keep this as an opt-in experimental boost so + # callers can toggle it via env without relying on vector similarity + # for core behavior. + vector_scores: Dict[str, float] = {} + use_vectors = use_scoring and str( + os.environ.get("COMMIT_VECTOR_SEARCH", "0") or "1" + ).strip().lower() in {"1", "true", "yes", "on"} + if use_vectors: + try: + # Import lazily so environments without fastembed/sanitize helper + # still work with pure lexical search. + try: + from scripts.utils import ( # type: ignore + sanitize_vector_name as _sanitize_vector_name, + ) + except Exception: + _sanitize_vector_name = None # type: ignore + + from fastembed import TextEmbedding # type: ignore + + model_name = os.environ.get( + "MODEL_NAME", "BAAI/bge-base-en-v1.5" + ) + vec_name: Optional[str] + if _sanitize_vector_name is not None: + try: + vec_name = _sanitize_vector_name(model_name) + except Exception: + vec_name = None + else: + vec_name = None + + if vec_name: + # Embed the behavior phrase query once and run a vector + # search over the commit collection. We keep the result set + # modest and later blend its scores into the lexical + # scoring for matching commits. + embed_model = TextEmbedding(model_name=model_name) + qtext = " ".join(q_terms) if q_terms else "" + if qtext.strip(): + qvec = next(embed_model.embed([qtext])).tolist() + + def _vec_search(): + return client.search( + collection_name=coll, + query_vector={vec_name: qvec}, + query_filter=filt, + limit=min(mcap, 128), + with_payload=True, + with_vectors=False, + ) + + v_hits = await asyncio.to_thread(_vec_search) + for sp in v_hits or []: + payload_v = getattr(sp, "payload", {}) or {} + md_v = payload_v.get("metadata") or {} + cid_v = md_v.get("commit_id") or md_v.get("symbol") + scid_v = str(cid_v) if cid_v is not None else "" + if not scid_v: + continue + try: + vs = float(getattr(sp, "score", 0.0) or 0.0) + except Exception: + vs = 0.0 + if vs <= 0.0: + continue + # Keep the best vector score per commit id + if scid_v not in vector_scores or vs > vector_scores[scid_v]: + vector_scores[scid_v] = vs + except Exception: + vector_scores = {} + page = None scanned = 0 out: list[dict[str, Any]] = [] seen_ids: set[str] = set() - while scanned < mcap and len(seen_ids) < lim: + while scanned < mcap and len(seen_ids) < max_ids_for_scan: sc, page = await asyncio.to_thread( lambda: client.scroll( collection_name=coll, @@ -3005,17 +3312,43 @@ async def search_commits_for( ][:6] else: lineage_tags = [] - # Build a composite lowercase text blob for simple lexical matching - lineage_text_parts = [] - if lineage_goal: - lineage_text_parts.append(lineage_goal) - if lineage_symbols: - lineage_text_parts.extend(lineage_symbols) - if lineage_tags: - lineage_text_parts.extend(lineage_tags) - text_l = (msg + "\n" + info + "\n" + " ".join(lineage_text_parts)).lower() - if q_terms and not all(t in text_l for t in q_terms): - continue + + # Field-aware lexical scoring using commit message + optional + # lineage metadata. This replaces the previous strict + # "all tokens must appear" filter, so we can rank commits by + # how well they match the behavior phrase. + score = 0.0 + if q_terms: + # Precompute lowercase field views once per commit + msg_l = msg.lower() + info_l = info.lower() + goal_l = lineage_goal.lower() if lineage_goal else "" + sym_l = " ".join(lineage_symbols).lower() if lineage_symbols else "" + tags_l = " ".join(lineage_tags).lower() if lineage_tags else "" + hits = 0 + for t in q_terms: + term_hit = False + if goal_l and t in goal_l: + score += 3.0 + term_hit = True + if tags_l and t in tags_l: + score += 2.0 + term_hit = True + if sym_l and t in sym_l: + score += 1.5 + term_hit = True + if msg_l and t in msg_l: + score += 1.0 + term_hit = True + if info_l and t in info_l: + score += 0.5 + term_hit = True + if term_hit: + hits += 1 + # Require at least one token match across any field when a + # query is provided; otherwise treat as non-match. + if hits == 0: + continue if p: # Require the path substring to appear in at least one touched file if not any(p in f for f in files_list): @@ -3024,6 +3357,22 @@ async def search_commits_for( scid = str(cid) if cid is not None else "" if not scid or scid in seen_ids: continue + # Blend in vector similarity score when available. Qdrant's + # search returns higher scores for closer vectors (e.g., cosine + # similarity), so we can treat it as a positive boost on top of + # the lexical/lineage score. + try: + if use_scoring and vector_scores and scid in vector_scores: + vec_score = float(vector_scores.get(scid, 0.0) or 0.0) + if vec_score > 0.0: + weight = float( + os.environ.get("COMMIT_VECTOR_WEIGHT", "2.0") or 2.0 + ) + score += weight * vec_score + except Exception: + # On any unexpected issue with vector blending, fall back to + # the lexical score we already computed. + pass seen_ids.add(scid) out.append( { @@ -3035,11 +3384,29 @@ async def search_commits_for( "lineage_goal": lineage_goal, "lineage_symbols": lineage_symbols, "lineage_tags": lineage_tags, + "_score": score, } ) - if len(seen_ids) >= lim: + if len(seen_ids) >= max_ids_for_scan: break - return {"ok": True, "results": out, "scanned": scanned, "collection": coll} + results = out + # When scoring is enabled (query provided), sort by score descending + # and trim to the requested limit. When no query is provided, preserve + # the original behavior (scroll order, up to "lim" unique commits). + if use_scoring and results: + try: + results = sorted( + results, + key=lambda c: float(c.get("_score", 0.0)), + reverse=True, + ) + except Exception: + # On any unexpected error, fall back to unsorted results + pass + results = results[:lim] + for c in results: + c.pop("_score", None) + return {"ok": True, "results": results, "scanned": scanned, "collection": coll} except Exception as e: return {"ok": False, "error": str(e), "collection": coll} diff --git a/scripts/watch_index.py b/scripts/watch_index.py index ba373d82..151fda42 100644 --- a/scripts/watch_index.py +++ b/scripts/watch_index.py @@ -35,6 +35,9 @@ ensure_logical_repo_id, find_collection_for_logical_repo, logical_repo_reuse_enabled, + _get_repo_state_dir, + _cross_process_lock, + get_collection_mappings, ) import hashlib from datetime import datetime @@ -234,7 +237,12 @@ def __init__( self.client = client resolved_collection = collection if collection is not None else default_collection self.default_collection = resolved_collection - self.collection = resolved_collection + # In multi-repo mode, per-file collections are resolved via _get_collection_for_file. + # Avoid using a root-level default collection (e.g., "/work-") for data ops. + if is_multi_repo_mode(): + self.collection = None + else: + self.collection = resolved_collection self.excl = idx._Excluder(root) # Track ignore file for live reloads try: @@ -339,7 +347,10 @@ def on_deleted(self, event): return if self.client is not None: try: - collection = self.collection or _get_collection_for_file(p) + if is_multi_repo_mode(): + collection = _get_collection_for_file(p) + else: + collection = self.collection or _get_collection_for_file(p) idx.delete_points_by_path(self.client, collection, str(p)) print(f"[deleted] {p} -> {collection}") except Exception: @@ -408,8 +419,12 @@ def on_moved(self, event): if self.excl.exclude_dir(rel_dir): if src.suffix.lower() in idx.CODE_EXTS: try: + if is_multi_repo_mode(): + coll = _get_collection_for_file(src) + else: + coll = self.collection or _get_collection_for_file(src) idx.delete_points_by_path( - self.client, self.collection, str(src) + self.client, coll, str(src) ) print(f"[moved:ignored_dest_deleted_src] {src} -> {dest}") try: @@ -488,11 +503,15 @@ def on_moved(self, event): try: idx.delete_points_by_path(self.client, src_collection, str(src)) except Exception: - idx.delete_points_by_path( - self.client, - self.collection or src_collection, - str(src), - ) + # In multi-repo mode, avoid falling back to any root-level collection. + if (not is_multi_repo_mode()) and self.collection: + idx.delete_points_by_path( + self.client, + self.collection, + str(src), + ) + else: + raise print(f"[moved:deleted_src] {src}") except Exception: pass @@ -702,6 +721,75 @@ def _rename_in_store( return -1, None +def _start_pseudo_backfill_worker(client: QdrantClient, default_collection: str) -> None: + flag = (os.environ.get("PSEUDO_BACKFILL_ENABLED") or "").strip().lower() + if flag not in {"1", "true", "yes", "on"}: + return + + try: + interval = float(os.environ.get("PSEUDO_BACKFILL_TICK_SECS", "60") or 60.0) + except Exception: + interval = 60.0 + if interval <= 0: + return + try: + max_points = int(os.environ.get("PSEUDO_BACKFILL_MAX_POINTS", "256") or 256) + except Exception: + max_points = 256 + if max_points <= 0: + max_points = 1 + + def _worker() -> None: + while True: + try: + try: + mappings = get_collection_mappings(search_root=str(ROOT)) + except Exception: + mappings = [] + if not mappings: + mappings = [ + {"repo_name": None, "collection_name": default_collection}, + ] + for mapping in mappings: + coll = mapping.get("collection_name") or default_collection + repo_name = mapping.get("repo_name") + if not coll: + continue + try: + if is_multi_repo_mode() and repo_name: + state_dir = _get_repo_state_dir(repo_name) + else: + state_dir = _get_global_state_dir(str(ROOT)) + lock_path = state_dir / "pseudo.lock" + with _cross_process_lock(lock_path): + processed = idx.pseudo_backfill_tick( + client, + coll, + repo_name=repo_name, + max_points=max_points, + ) + if processed: + try: + print( + f"[pseudo_backfill] repo={repo_name or 'default'} collection={coll} processed={processed}" + ) + except Exception: + pass + except Exception as e: + try: + print( + f"[pseudo_backfill] error repo={repo_name or 'default'} collection={coll}: {e}" + ) + except Exception: + pass + except Exception: + pass + time.sleep(interval) + + thread = threading.Thread(target=_worker, name="pseudo-backfill", daemon=True) + thread.start() + + def main(): # Resolve collection name from workspace state before any client/state ops try: @@ -716,7 +804,9 @@ def main(): multi_repo_enabled = False default_collection = os.environ.get("COLLECTION_NAME", "my-collection") - if _get_coll: + # In multi-repo mode, per-repo collections are resolved via _get_collection_for_file + # and workspace_state; avoid deriving a root-level collection like "/work-". + if _get_coll and not multi_repo_enabled: try: resolved = _get_coll(str(ROOT)) if resolved: @@ -788,6 +878,8 @@ def main(): except Exception: pass + _start_pseudo_backfill_worker(client, default_collection) + try: if multi_repo_enabled: root_repo_name = _extract_repo_name_from_path(str(ROOT)) @@ -1043,8 +1135,11 @@ def _process_paths(paths, client, model, vector_name: str, model_dim: int, works except Exception: pass - # Fallback: full single-file reindex + # Fallback: full single-file reindex. Pseudo/tags are inlined by default; + # when PSEUDO_BACKFILL_ENABLED=1 we run base-only and rely on backfill. if not ok: + flag = (os.environ.get("PSEUDO_BACKFILL_ENABLED") or "").strip().lower() + pseudo_mode = "off" if flag in {"1", "true", "yes", "on"} else "full" ok = idx.index_single_file( client, model, @@ -1053,6 +1148,7 @@ def _process_paths(paths, client, model, vector_name: str, model_dim: int, works p, dedupe=True, skip_unchanged=False, + pseudo_mode=pseudo_mode, ) except Exception as e: try: diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index df48059e..0093f5f3 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -717,12 +717,20 @@ def get_cached_file_hash(file_path: str, repo_name: Optional[str] = None) -> str with open(cache_path, 'r', encoding='utf-8') as f: cache = json.load(f) file_hashes = cache.get("file_hashes", {}) - return file_hashes.get(str(Path(file_path).resolve()), "") + fp = str(Path(file_path).resolve()) + val = file_hashes.get(fp, "") + if isinstance(val, dict): + return str(val.get("hash") or "") + return str(val or "") except Exception: pass else: cache = _read_cache(_resolve_workspace_root()) - return cache.get("file_hashes", {}).get(str(Path(file_path).resolve()), "") + fp = str(Path(file_path).resolve()) + val = cache.get("file_hashes", {}).get(fp, "") + if isinstance(val, dict): + return str(val.get("hash") or "") + return str(val or "") return "" @@ -748,7 +756,18 @@ def set_cached_file_hash(file_path: str, file_hash: str, repo_name: Optional[str else: cache = {"file_hashes": {}, "created_at": datetime.now().isoformat()} - cache.setdefault("file_hashes", {})[fp] = file_hash + entry: Any = file_hash + try: + st = Path(file_path).stat() + entry = { + "hash": file_hash, + "size": int(getattr(st, "st_size", 0)), + "mtime": int(getattr(st, "st_mtime", 0)), + } + except Exception: + pass + + cache.setdefault("file_hashes", {})[fp] = entry cache["updated_at"] = datetime.now().isoformat() _atomic_write_state(cache_path, cache) # reuse atomic writer for files @@ -757,11 +776,52 @@ def set_cached_file_hash(file_path: str, file_hash: str, repo_name: Optional[str return cache = _read_cache(_resolve_workspace_root()) - cache.setdefault("file_hashes", {})[fp] = file_hash + entry: Any = file_hash + try: + st = Path(file_path).stat() + entry = { + "hash": file_hash, + "size": int(getattr(st, "st_size", 0)), + "mtime": int(getattr(st, "st_mtime", 0)), + } + except Exception: + pass + cache.setdefault("file_hashes", {})[fp] = entry cache["updated_at"] = datetime.now().isoformat() _write_cache(_resolve_workspace_root(), cache) +def get_cached_file_meta(file_path: str, repo_name: Optional[str] = None) -> Dict[str, Any]: + fp = str(Path(file_path).resolve()) + if is_multi_repo_mode() and repo_name: + state_dir = _get_repo_state_dir(repo_name) + cache_path = state_dir / CACHE_FILENAME + + if cache_path.exists(): + try: + with open(cache_path, 'r', encoding='utf-8') as f: + cache = json.load(f) + file_hashes = cache.get("file_hashes", {}) + val = file_hashes.get(fp) + except Exception: + val = None + else: + val = None + else: + cache = _read_cache(_resolve_workspace_root()) + val = cache.get("file_hashes", {}).get(fp) + + if isinstance(val, dict): + return { + "hash": str(val.get("hash") or ""), + "size": val.get("size"), + "mtime": val.get("mtime"), + } + if isinstance(val, str): + return {"hash": val} + return {} + + def remove_cached_file(file_path: str, repo_name: Optional[str] = None) -> None: """Remove file entry from cache.""" if is_multi_repo_mode() and repo_name: