From 23dad481c4125b54af7c721e9eaa4c3c266b6e24 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Mon, 8 Dec 2025 22:28:16 -0500 Subject: [PATCH 1/5] Add connection pooling and fast JSON serialization Introduces connection pooling for QdrantClient and uses orjson for faster JSON serialization when available. Adds a shared ThreadPoolExecutor for parallel queries and implements a cache for filter sanitization to improve performance. --- scripts/hybrid_search.py | 42 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/scripts/hybrid_search.py b/scripts/hybrid_search.py index 8c35dc3f..71aaaeb6 100644 --- a/scripts/hybrid_search.py +++ b/scripts/hybrid_search.py @@ -2,6 +2,8 @@ import os import argparse from typing import List, Dict, Any, Tuple +from concurrent.futures import ThreadPoolExecutor +from functools import lru_cache from qdrant_client import QdrantClient, models from fastembed import TextEmbedding @@ -9,9 +11,49 @@ import json import math +# Prefer orjson for faster serialization (2-3x speedup) +try: + import orjson + def _json_dumps(obj): + return orjson.dumps(obj).decode() +except ImportError: + orjson = None + def _json_dumps(obj): + return json.dumps(obj) + import logging import threading +# Connection pooling imports +try: + from scripts.qdrant_client_manager import get_qdrant_client, return_qdrant_client, pooled_qdrant_client + _POOL_AVAILABLE = True +except ImportError: + _POOL_AVAILABLE = False + def get_qdrant_client(url=None, api_key=None, force_new=False, use_pool=True): + return QdrantClient(url=url or os.environ.get("QDRANT_URL", "http://localhost:6333"), + api_key=api_key or os.environ.get("QDRANT_API_KEY")) + def return_qdrant_client(client): + pass + +# ThreadPoolExecutor for parallel queries (reuse across calls) +_QUERY_EXECUTOR = None +_EXECUTOR_LOCK = threading.Lock() + +def _get_query_executor(max_workers: int = 4) -> ThreadPoolExecutor: + """Get or create a shared ThreadPoolExecutor for parallel queries.""" + global _QUERY_EXECUTOR + if _QUERY_EXECUTOR is None: + with _EXECUTOR_LOCK: + if _QUERY_EXECUTOR is None: + _QUERY_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers) + return _QUERY_EXECUTOR + +# Filter sanitization cache (avoids repeated deep copies) +_FILTER_CACHE = {} +_FILTER_CACHE_LOCK = threading.Lock() +_FILTER_CACHE_MAX = 256 + # Import unified caching system try: from scripts.cache_manager import get_search_cache, get_embedding_cache, get_expansion_cache From e468eba0db8dc8a1f8831b19e8f88d74ab755168 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Mon, 8 Dec 2025 22:31:25 -0500 Subject: [PATCH 2/5] Optimize search and rerank performance, add orjson Adds 'orjson' to requirements. Improves hybrid_search.py with cached regex compilation, filter object caching, and parallel dense query execution. In ingest_code.py, increases default batch size for indexing. In rerank_local.py, adds background warmup for ONNX reranker session to reduce initial latency. --- requirements.txt | 1 + scripts/hybrid_search.py | 87 +++++++++++++++++++++++++++++----------- scripts/ingest_code.py | 2 +- scripts/rerank_local.py | 29 ++++++++++++++ 4 files changed, 94 insertions(+), 25 deletions(-) diff --git a/requirements.txt b/requirements.txt index d6c0f21e..e901481c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,7 @@ fastembed watchdog onnxruntime tokenizers +orjson tree_sitter>=0.25.2 tree_sitter_languages; python_version < "3.13" mcp==1.17.0 diff --git a/scripts/hybrid_search.py b/scripts/hybrid_search.py index 71aaaeb6..24059185 100644 --- a/scripts/hybrid_search.py +++ b/scripts/hybrid_search.py @@ -54,6 +54,12 @@ def _get_query_executor(max_workers: int = 4) -> ThreadPoolExecutor: _FILTER_CACHE_LOCK = threading.Lock() _FILTER_CACHE_MAX = 256 +# Cached regex pattern compilation (avoids recompiling same patterns) +@lru_cache(maxsize=128) +def _compile_regex(pattern: str, flags: int = 0): + """Cached regex compilation for repeated patterns.""" + return re.compile(pattern, flags) + # Import unified caching system try: from scripts.cache_manager import get_search_cache, get_embedding_cache, get_expansion_cache @@ -1314,10 +1320,18 @@ def lex_hash_vector(phrases: List[str], dim: int = LEX_VECTOR_DIM) -> List[float # Defensive: sanitize Qdrant filter objects so we never send an empty filter {} # Qdrant returns 400 if filter has no conditions; return None in that case. +# Uses caching for repeated filter patterns to avoid redundant validation. def _sanitize_filter_obj(flt): + if flt is None: + return None + + # Try cache first (hash by id for object identity) + cache_key = id(flt) + with _FILTER_CACHE_LOCK: + if cache_key in _FILTER_CACHE: + return _FILTER_CACHE[cache_key] + try: - if flt is None: - return None # Try model-style attributes first must = getattr(flt, "must", None) should = getattr(flt, "should", None) @@ -1328,17 +1342,24 @@ def _sanitize_filter_obj(flt): m = [c for c in (flt.get("must") or []) if c is not None] s = [c for c in (flt.get("should") or []) if c is not None] mn = [c for c in (flt.get("must_not") or []) if c is not None] - return None if (not m and not s and not mn) else flt - # Unknown structure -> drop - return None - m = [c for c in (must or []) if c is not None] - s = [c for c in (should or []) if c is not None] - mn = [c for c in (must_not or []) if c is not None] - if not m and not s and not mn: - return None - return flt + result = None if (not m and not s and not mn) else flt + else: + # Unknown structure -> drop + result = None + else: + m = [c for c in (must or []) if c is not None] + s = [c for c in (should or []) if c is not None] + mn = [c for c in (must_not or []) if c is not None] + result = None if (not m and not s and not mn) else flt except Exception: - return None + result = None + + # Cache result (with size limit) + with _FILTER_CACHE_LOCK: + if len(_FILTER_CACHE) < _FILTER_CACHE_MAX: + _FILTER_CACHE[cache_key] = result + + return result def lex_query(client: QdrantClient, v: List[float], flt, per_query: int, collection_name: str | None = None) -> List[Any]: @@ -2026,18 +2047,36 @@ def _scaled_rrf(rank: int) -> float: flt_gated = _sanitize_filter_obj(flt_gated) - result_sets: List[List[Any]] = [ - dense_query( - client, - vec_name, - v, - flt_gated, - _scaled_per_query, - collection, - query_text=queries[i] if i < len(queries) else None, - ) - for i, v in enumerate(embedded) - ] + # Parallel dense query execution for multiple queries + if len(embedded) > 1 and os.environ.get("PARALLEL_DENSE_QUERIES", "1") == "1": + executor = _get_query_executor() + futures = [ + executor.submit( + dense_query, + client, + vec_name, + v, + flt_gated, + _scaled_per_query, + collection, + queries[i] if i < len(queries) else None, + ) + for i, v in enumerate(embedded) + ] + result_sets: List[List[Any]] = [f.result() for f in futures] + else: + result_sets: List[List[Any]] = [ + dense_query( + client, + vec_name, + v, + flt_gated, + _scaled_per_query, + collection, + query_text=queries[i] if i < len(queries) else None, + ) + for i, v in enumerate(embedded) + ] if os.environ.get("DEBUG_HYBRID_SEARCH"): total_dense_results = sum(len(rs) for rs in result_sets) logger.debug(f"Dense query returned {total_dense_results} total results across {len(result_sets)} queries") diff --git a/scripts/ingest_code.py b/scripts/ingest_code.py index 3a5174ec..9f8dd5f1 100644 --- a/scripts/ingest_code.py +++ b/scripts/ingest_code.py @@ -2782,7 +2782,7 @@ def index_repo( batch_meta: list[dict] = [] batch_ids: list[int] = [] batch_lex: list[list[float]] = [] - BATCH_SIZE = int(os.environ.get("INDEX_BATCH_SIZE", "64") or 64) + BATCH_SIZE = int(os.environ.get("INDEX_BATCH_SIZE", "256") or 256) CHUNK_LINES = int(os.environ.get("INDEX_CHUNK_LINES", "120") or 120) CHUNK_OVERLAP = int(os.environ.get("INDEX_CHUNK_OVERLAP", "20") or 20) PROGRESS_EVERY = int(os.environ.get("INDEX_PROGRESS_EVERY", "200") or 200) diff --git a/scripts/rerank_local.py b/scripts/rerank_local.py index 56fe7453..035d9467 100644 --- a/scripts/rerank_local.py +++ b/scripts/rerank_local.py @@ -41,6 +41,8 @@ _RERANK_LOCK = threading.Lock() +_WARMUP_DONE = False + def _get_rerank_session(): global _RERANK_SESSION, _RERANK_TOKENIZER if not (ort and Tokenizer and RERANKER_ONNX_PATH and RERANKER_TOKENIZER_PATH): @@ -101,6 +103,33 @@ def _get_rerank_session(): from scripts.utils import sanitize_vector_name as _sanitize_vector_name +def warmup_reranker(): + """Background warmup: load ONNX session and run a dummy inference.""" + global _WARMUP_DONE + if _WARMUP_DONE: + return + sess, tok = _get_rerank_session() + if sess and tok: + try: + # Dummy inference to warm up the session + dummy_pairs = [("warmup query", "warmup document")] + rerank_local(dummy_pairs) + except Exception: + pass + _WARMUP_DONE = True + + +def _start_background_warmup(): + """Start background thread to warm up reranker.""" + if os.environ.get("RERANK_WARMUP", "1") == "1": + t = threading.Thread(target=warmup_reranker, daemon=True) + t.start() + + +# Auto-start warmup on module import +_start_background_warmup() + + def _norm_under(u: str | None) -> str | None: if not u: return None From 250944f678b86a183fecc5bf8b712791112388cc Mon Sep 17 00:00:00 2001 From: John Donalson Date: Mon, 8 Dec 2025 22:47:07 -0500 Subject: [PATCH 3/5] Add workspace listing and router cache invalidation Introduces list_workspaces to enumerate workspaces and their states by scanning for state.json files, supporting both single and multi-repo modes. Adds _invalidate_router_scratchpad stub to allow router cache invalidation after indexing. Fixes progress reporting in ingest_code.py and minor logging improvements in mcp_indexer_server.py. --- scripts/ingest_code.py | 6 +- scripts/mcp_indexer_server.py | 19 +++++- scripts/workspace_state.py | 119 ++++++++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 5 deletions(-) diff --git a/scripts/ingest_code.py b/scripts/ingest_code.py index 9f8dd5f1..8d0f0f6b 100644 --- a/scripts/ingest_code.py +++ b/scripts/ingest_code.py @@ -21,7 +21,7 @@ def _detect_repo_name_from_path(path: Path) -> str: import ast import time from pathlib import Path -from typing import List, Dict, Iterable +from typing import List, Dict, Iterable, Any, Optional try: from tqdm import tqdm @@ -3286,8 +3286,8 @@ def make_point(pid, dense_vec, lex_vec, payload): status={ "state": "indexing", "progress": { - "files_processed": repo_progress.get(per_file_repo, 0), - "total_files": repo_total.get(per_file_repo, None), + "files_processed": files_indexed, + "total_files": files_seen, "current_file": str(file_path), }, }, diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 8f3d879d..c8833ccb 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -33,7 +33,7 @@ import subprocess import threading import time -from typing import Any, Dict, Optional, List +from typing import Any, Dict, Optional, List, Tuple from pathlib import Path import sys @@ -629,6 +629,20 @@ def _maybe_parse_jsonish(obj: _Any): import urllib.parse as _urlparse, ast as _ast +def _invalidate_router_scratchpad(workspace_path: str) -> bool: + """Invalidate any cached router scratchpad for the workspace. + + This is called after indexing operations to ensure the router + picks up new/changed code. Returns True if invalidation occurred. + """ + # Stub implementation - can be extended later for router cache invalidation + try: + # Clear any in-memory caches that might be stale + return True + except Exception: + return False + + def _parse_kv_string(s: str) -> _Dict[str, _Any]: """Parse non-JSON strings like "a=1&b=2" or "query=[\"a\",\"b\"]" into a dict. Values are JSON-decoded when possible; else literal-eval; else kept as raw strings. @@ -653,7 +667,7 @@ def _parse_kv_string(s: str) -> _Dict[str, _Any]: out[k.strip()] = _coerce_value_string(v.strip()) return out except Exception as e: - logger.debug(f"Failed to parse KV string '{input_str}': {e}") + logger.debug(f"Failed to parse KV string '{s}': {e}") return {} return out @@ -4776,6 +4790,7 @@ async def expand_query(query: Any = None, max_new: Any = None) -> Dict[str, Any] "Return JSON array of strings only. No explanations.\n" f"Queries: {qlist}\n" ) + client = LlamaCppRefragClient() out = client.generate_with_soft_embeddings( prompt=prompt, max_tokens=int(os.environ.get("EXPAND_MAX_TOKENS", "64") or 64), diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index 0093f5f3..b1d40a96 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -1269,4 +1269,123 @@ def compare_symbol_changes(old_symbols: dict, new_symbols: dict) -> tuple[list, return unchanged, changed +def list_workspaces(search_root: Optional[str] = None) -> List[Dict[str, Any]]: + """Scan search_root recursively for .codebase/state.json and summarize workspaces. + + Args: + search_root: Directory to scan; defaults to parent of /work. + + Returns: + List of workspace info dicts with keys: + - workspace_path: str + - collection_name: str + - last_updated: str or int (ISO timestamp or unix) + - indexing_state: str + """ + if search_root is None: + # Default to parent of workspace root + try: + search_root = str(Path(_resolve_workspace_root()).parent) + except Exception: + search_root = "/work" + + root_path = Path(search_root).resolve() + workspaces: List[Dict[str, Any]] = [] + seen_paths: set = set() + + try: + # Find all state.json files + for state_file in root_path.rglob(f"{STATE_DIRNAME}/{STATE_FILENAME}"): + try: + # Skip if in repos subdirectory (multi-repo per-repo states) + if "repos" in state_file.parts: + continue + + workspace_path = str(state_file.parent.parent.resolve()) + + # Skip duplicates + if workspace_path in seen_paths: + continue + seen_paths.add(workspace_path) + + # Read state file + with open(state_file, "r", encoding="utf-8") as f: + state = json.load(f) + + if not isinstance(state, dict): + continue + + # Extract info + collection_name = state.get("qdrant_collection", "") + updated_at = state.get("updated_at", "") + + indexing_status = state.get("indexing_status", {}) + if isinstance(indexing_status, dict): + indexing_state = indexing_status.get("state", "unknown") + else: + indexing_state = "unknown" + + workspaces.append({ + "workspace_path": workspace_path, + "collection_name": collection_name, + "last_updated": updated_at, + "indexing_state": indexing_state, + }) + except Exception: + continue + + # Also check multi-repo states + if is_multi_repo_mode(): + repos_root = root_path / STATE_DIRNAME / "repos" + if repos_root.exists(): + for repo_dir in repos_root.iterdir(): + if not repo_dir.is_dir(): + continue + state_file = repo_dir / STATE_FILENAME + if not state_file.exists(): + continue + try: + with open(state_file, "r", encoding="utf-8") as f: + state = json.load(f) + + if not isinstance(state, dict): + continue + + repo_name = repo_dir.name + workspace_path = state.get("workspace_path", str(root_path / repo_name)) + + if workspace_path in seen_paths: + continue + seen_paths.add(workspace_path) + + collection_name = state.get("qdrant_collection", "") + updated_at = state.get("updated_at", "") + + indexing_status = state.get("indexing_status", {}) + if isinstance(indexing_status, dict): + indexing_state = indexing_status.get("state", "unknown") + else: + indexing_state = "unknown" + + workspaces.append({ + "workspace_path": workspace_path, + "collection_name": collection_name, + "last_updated": updated_at, + "indexing_state": indexing_state, + "repo_name": repo_name, + }) + except Exception: + continue + except Exception: + pass + + # Sort by last_updated descending + try: + workspaces.sort(key=lambda w: w.get("last_updated", ""), reverse=True) + except Exception: + pass + + return workspaces + + # Add missing functions that callers expect (already defined above) \ No newline at end of file From 994ec4b1a9d878c84318605ce3f9beedf13f1799 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Tue, 9 Dec 2025 06:37:59 -0500 Subject: [PATCH 4/5] Update workspace_state.py --- scripts/workspace_state.py | 123 ++++++++++++++++++++++++++++++++++++- 1 file changed, 120 insertions(+), 3 deletions(-) diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index b1d40a96..da7d6b73 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -1269,11 +1269,19 @@ def compare_symbol_changes(old_symbols: dict, new_symbols: dict) -> tuple[list, return unchanged, changed -def list_workspaces(search_root: Optional[str] = None) -> List[Dict[str, Any]]: - """Scan search_root recursively for .codebase/state.json and summarize workspaces. +def list_workspaces( + search_root: Optional[str] = None, + use_qdrant_fallback: bool = True, +) -> List[Dict[str, Any]]: + """Scan for workspaces via local filesystem or Qdrant collections. + + Supports both local/mounted and remote client-server scenarios: + - Local: Scans filesystem for .codebase/state.json files + - Remote: Falls back to querying Qdrant collections for workspace metadata Args: - search_root: Directory to scan; defaults to parent of /work. + search_root: Directory to scan for local mode; defaults to parent of /work. + use_qdrant_fallback: If True and no local workspaces found, query Qdrant. Returns: List of workspace info dicts with keys: @@ -1281,6 +1289,7 @@ def list_workspaces(search_root: Optional[str] = None) -> List[Dict[str, Any]]: - collection_name: str - last_updated: str or int (ISO timestamp or unix) - indexing_state: str + - source: "local" or "qdrant" (indicates discovery method) """ if search_root is None: # Default to parent of workspace root @@ -1293,6 +1302,7 @@ def list_workspaces(search_root: Optional[str] = None) -> List[Dict[str, Any]]: workspaces: List[Dict[str, Any]] = [] seen_paths: set = set() + # --- Local filesystem scan --- try: # Find all state.json files for state_file in root_path.rglob(f"{STATE_DIRNAME}/{STATE_FILENAME}"): @@ -1330,6 +1340,7 @@ def list_workspaces(search_root: Optional[str] = None) -> List[Dict[str, Any]]: "collection_name": collection_name, "last_updated": updated_at, "indexing_state": indexing_state, + "source": "local", }) except Exception: continue @@ -1373,12 +1384,20 @@ def list_workspaces(search_root: Optional[str] = None) -> List[Dict[str, Any]]: "last_updated": updated_at, "indexing_state": indexing_state, "repo_name": repo_name, + "source": "local", }) except Exception: continue except Exception: pass + # --- Qdrant fallback for remote scenarios --- + if not workspaces and use_qdrant_fallback: + try: + workspaces = _list_workspaces_from_qdrant(seen_paths) + except Exception: + pass + # Sort by last_updated descending try: workspaces.sort(key=lambda w: w.get("last_updated", ""), reverse=True) @@ -1388,4 +1407,102 @@ def list_workspaces(search_root: Optional[str] = None) -> List[Dict[str, Any]]: return workspaces +def _list_workspaces_from_qdrant(seen_paths: set) -> List[Dict[str, Any]]: + """Query Qdrant collections to discover workspaces (for remote scenarios). + + Samples points from each collection to extract workspace metadata. + """ + workspaces: List[Dict[str, Any]] = [] + + try: + from qdrant_client import QdrantClient + except ImportError: + return workspaces + + qdrant_url = os.environ.get("QDRANT_URL", "http://localhost:6333") + qdrant_key = os.environ.get("QDRANT_API_KEY") + + try: + client = QdrantClient( + url=qdrant_url, + api_key=qdrant_key, + timeout=float(os.environ.get("QDRANT_TIMEOUT", "10") or 10), + ) + + # List all collections + collections = client.get_collections().collections + + for coll in collections: + coll_name = coll.name + if not coll_name: + continue + + # Sample a few points to extract workspace metadata + try: + points, _ = client.scroll( + collection_name=coll_name, + limit=5, + with_payload=True, + with_vectors=False, + ) + + if not points: + continue + + # Extract workspace info from sampled points + workspace_path = None + repo_name = None + last_ingested = None + + for pt in points: + payload = getattr(pt, "payload", {}) or {} + md = payload.get("metadata", {}) or {} + + # Try to get workspace path from metadata + if not workspace_path: + workspace_path = ( + md.get("workspace_path") + or md.get("source_root") + or payload.get("workspace_path") + ) + + # Try to get repo name + if not repo_name: + repo_name = md.get("repo") or md.get("repo_name") + + # Get ingestion timestamp + ts = md.get("ingested_at") or payload.get("ingested_at") + if ts and (last_ingested is None or ts > last_ingested): + last_ingested = ts + + # Build workspace entry + ws_path = workspace_path or f"/work/{repo_name}" if repo_name else f"[{coll_name}]" + + if ws_path in seen_paths: + continue + seen_paths.add(ws_path) + + workspaces.append({ + "workspace_path": ws_path, + "collection_name": coll_name, + "last_updated": last_ingested or "", + "indexing_state": "indexed", # If points exist, it's indexed + "repo_name": repo_name or "", + "source": "qdrant", + }) + except Exception: + # Collection exists but couldn't sample - still report it + workspaces.append({ + "workspace_path": f"[{coll_name}]", + "collection_name": coll_name, + "last_updated": "", + "indexing_state": "unknown", + "source": "qdrant", + }) + except Exception: + pass + + return workspaces + + # Add missing functions that callers expect (already defined above) \ No newline at end of file From a325327e4eb2711cf37ab68023df7ceaa7d9abcf Mon Sep 17 00:00:00 2001 From: John Donalson Date: Tue, 9 Dec 2025 07:13:12 -0500 Subject: [PATCH 5/5] Update extension.js --- vscode-extension/context-engine-uploader/extension.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vscode-extension/context-engine-uploader/extension.js b/vscode-extension/context-engine-uploader/extension.js index a72b7cdc..df469fa4 100644 --- a/vscode-extension/context-engine-uploader/extension.js +++ b/vscode-extension/context-engine-uploader/extension.js @@ -280,6 +280,11 @@ function getTargetPath(config) { } } if (targetPath) { + // Resolve relative paths (like ".") against the workspace folder, not Node cwd + const folderPath = getWorkspaceFolderPath(); + if (folderPath && !path.isAbsolute(targetPath)) { + targetPath = path.resolve(folderPath, targetPath); + } updateStatusBarTooltip(targetPath); return targetPath; }