diff --git a/.env b/.env index b26ac1a1..802b6ea8 100644 --- a/.env +++ b/.env @@ -29,9 +29,9 @@ TOOL_FIND_DESCRIPTION="Search for relevant code snippets using multiple phrasing # Optional: local cross-encoder reranker (ONNX) -# Set these to enable make rerank-local inside the container -RERANKER_ONNX_PATH=/work/models/model_qint8_avx512_vnni.onnx -RERANKER_TOKENIZER_PATH=/work/models/tokenizer.json +# Baked into Docker image at /app/models/ - no host mount needed +RERANKER_ONNX_PATH=/app/models/reranker.onnx +RERANKER_TOKENIZER_PATH=/app/models/tokenizer.json # Reranker toggles and tuning # Increased TOPN and RETURN_M for better final ranking diff --git a/Dockerfile.mcp-indexer b/Dockerfile.mcp-indexer index dcd2aa9f..5cfe1286 100644 --- a/Dockerfile.mcp-indexer +++ b/Dockerfile.mcp-indexer @@ -6,14 +6,26 @@ ENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 \ WORK_ROOTS="/work,/app" -# OS deps (git for history if we extend later) -RUN apt-get update && apt-get install -y --no-install-recommends git ca-certificates \ +# OS deps (git for history if we extend later, curl for model downloads) +RUN apt-get update && apt-get install -y --no-install-recommends git ca-certificates curl \ && rm -rf /var/lib/apt/lists/* # Python deps: reuse shared requirements (includes FastMCP + OpenAI SDK) COPY requirements.txt /tmp/requirements.txt RUN pip install --no-cache-dir --upgrade -r /tmp/requirements.txt +# Download reranker model and tokenizer during build +# Cross-encoder for reranking (ms-marco-MiniLM) + BGE tokenizer for micro-chunking +ARG RERANKER_ONNX_URL=https://huggingface.co/cross-encoder/ms-marco-MiniLM-L-6-v2/resolve/main/onnx/model.onnx +ARG TOKENIZER_URL=https://huggingface.co/BAAI/bge-base-en-v1.5/resolve/main/tokenizer.json +RUN mkdir -p /app/models && \ + curl -L --fail --retry 3 -o /app/models/reranker.onnx "${RERANKER_ONNX_URL}" && \ + curl -L --fail --retry 3 -o /app/models/tokenizer.json "${TOKENIZER_URL}" + +# Set default paths for reranker (can be overridden via env) +ENV RERANKER_ONNX_PATH=/app/models/reranker.onnx \ + RERANKER_TOKENIZER_PATH=/app/models/tokenizer.json + # Bake scripts into the image so entrypoints don't rely on /work COPY scripts /app/scripts diff --git a/scripts/hybrid_search.py b/scripts/hybrid_search.py index 21046ac5..98f52588 100644 --- a/scripts/hybrid_search.py +++ b/scripts/hybrid_search.py @@ -308,6 +308,90 @@ def _embed_queries_cached( MICRO_BUDGET_TOKENS = _safe_int(os.environ.get("MICRO_BUDGET_TOKENS", "512"), 512) MICRO_TOKENS_PER_LINE = _safe_int(os.environ.get("MICRO_TOKENS_PER_LINE", "32"), 32) +# === Large codebase scaling (default ON) === +# These parameters enable automatic scaling for collections with 10k+ points +LARGE_COLLECTION_THRESHOLD = _safe_int(os.environ.get("HYBRID_LARGE_THRESHOLD", "10000"), 10000) +MAX_RRF_K_SCALE = _safe_float(os.environ.get("HYBRID_MAX_RRF_K_SCALE", "3.0"), 3.0) +SCORE_NORMALIZE_ENABLED = os.environ.get("HYBRID_SCORE_NORMALIZE", "1").lower() in {"1", "true", "yes", "on"} + +# Cache for collection stats (avoid repeated Qdrant calls) +_COLL_STATS_CACHE: Dict[str, Tuple[float, Dict[str, Any]]] = {} +_COLL_STATS_TTL = 300 # 5 minutes + +def _get_collection_stats(client: QdrantClient, coll_name: str) -> Dict[str, Any]: + """Get cached collection statistics for scaling decisions.""" + import time + now = time.time() + cached = _COLL_STATS_CACHE.get(coll_name) + if cached and (now - cached[0]) < _COLL_STATS_TTL: + return cached[1] + try: + info = client.get_collection(coll_name) + stats = {"points_count": info.points_count or 0} + _COLL_STATS_CACHE[coll_name] = (now, stats) + return stats + except Exception: + return {"points_count": 0} + +def _scale_rrf_k(base_k: int, collection_size: int) -> int: + """Scale RRF k parameter based on collection size. + + For large collections, increase k to spread score distribution. + Uses logarithmic scaling: k_scaled = k * (1 + log10(size/threshold)) + Capped at MAX_RRF_K_SCALE * base_k. + """ + if collection_size < LARGE_COLLECTION_THRESHOLD: + return base_k + ratio = collection_size / LARGE_COLLECTION_THRESHOLD + scale = 1.0 + math.log10(max(1, ratio)) + scale = min(scale, MAX_RRF_K_SCALE) + return int(base_k * scale) + +def _adaptive_per_query(base_limit: int, collection_size: int, has_filters: bool) -> int: + """Increase candidate retrieval for larger collections. + + Uses sublinear sqrt scaling to avoid excessive retrieval. + Filters reduce the need for extra candidates. + """ + if collection_size < LARGE_COLLECTION_THRESHOLD: + return base_limit + ratio = collection_size / LARGE_COLLECTION_THRESHOLD + # sqrt scaling: doubles at 4x threshold, triples at 9x + scale = math.sqrt(ratio) + if has_filters: + scale = max(1.0, scale * 0.7) # reduce scaling when filters are active + # Cap at 3x base limit + scaled = int(base_limit * min(scale, 3.0)) + return max(base_limit, min(scaled, 200)) + +def _normalize_scores(score_map: Dict[str, Dict[str, Any]], collection_size: int) -> None: + """Normalize scores using z-score + sigmoid for large collections. + + This spreads compressed score distributions to improve discrimination. + Only applies when SCORE_NORMALIZE_ENABLED=true and collection is large. + """ + if not SCORE_NORMALIZE_ENABLED: + return + if collection_size < LARGE_COLLECTION_THRESHOLD: + return + if len(score_map) < 3: + return + + scores = [rec["s"] for rec in score_map.values()] + mean_s = sum(scores) / len(scores) + var_s = sum((s - mean_s) ** 2 for s in scores) / len(scores) + std_s = math.sqrt(var_s) if var_s > 0 else 1.0 + + if std_s < 1e-6: + return # All scores identical, nothing to normalize + + # Z-score normalization + sigmoid to [0, 1] range + for rec in score_map.values(): + z = (rec["s"] - mean_s) / std_s + # Sigmoid with scale factor for wider spread + normalized = 1.0 / (1.0 + math.exp(-z * 0.5)) + rec["s"] = normalized + def _merge_and_budget_spans(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Given ranked items with metadata path/start_line/end_line, merge nearby spans @@ -1672,11 +1756,29 @@ def _bn(p: str) -> str: except Exception: pass - # Lexical vector query (keep original RRF scoring) + # === Large codebase scaling (automatic) === + _coll_stats = _get_collection_stats(client, _collection(collection)) + _coll_size = _coll_stats.get("points_count", 0) + _has_filters = bool(eff_language or eff_repo or eff_under or eff_kind or eff_symbol or eff_ext) + + # Scale RRF k for better score discrimination at scale + _scaled_rrf_k = _scale_rrf_k(RRF_K, _coll_size) + + # Adaptive per_query: retrieve more candidates from larger collections + _scaled_per_query = _adaptive_per_query(max(24, limit), _coll_size, _has_filters) + + if os.environ.get("DEBUG_HYBRID_SEARCH") and _coll_size >= LARGE_COLLECTION_THRESHOLD: + logger.debug(f"Large collection scaling: size={_coll_size}, rrf_k={_scaled_rrf_k}, per_query={_scaled_per_query}") + + # Local RRF function using scaled k + def _scaled_rrf(rank: int) -> float: + return 1.0 / (_scaled_rrf_k + rank) + + # Lexical vector query (with scaled retrieval) score_map: Dict[str, Dict[str, Any]] = {} try: lex_vec = lex_hash_vector(qlist) - lex_results = lex_query(client, lex_vec, flt, max(24, limit), collection) + lex_results = lex_query(client, lex_vec, flt, _scaled_per_query, collection) except Exception: lex_results = [] @@ -1708,7 +1810,7 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - lxs = (_AD_LEX_VEC_W * rrf(rank)) if _USE_ADAPT else (LEX_VECTOR_WEIGHT * rrf(rank)) + lxs = (_AD_LEX_VEC_W * _scaled_rrf(rank)) if _USE_ADAPT else (LEX_VECTOR_WEIGHT * _scaled_rrf(rank)) score_map[pid]["lx"] += lxs score_map[pid]["s"] += lxs @@ -1829,7 +1931,7 @@ def _bn(p: str) -> str: flt_gated = _sanitize_filter_obj(flt_gated) result_sets: List[List[Any]] = [ - dense_query(client, vec_name, v, flt_gated, max(24, limit), collection) for v in embedded + dense_query(client, vec_name, v, flt_gated, _scaled_per_query, collection) for v in embedded ] if os.environ.get("DEBUG_HYBRID_SEARCH"): total_dense_results = sum(len(rs) for rs in result_sets) @@ -1846,7 +1948,7 @@ def _bn(p: str) -> str: try: mini_queries = [_project_mini(list(v), MINI_VEC_DIM) for v in embedded] mini_sets: List[List[Any]] = [ - dense_query(client, MINI_VECTOR_NAME, mv, flt, max(24, limit), collection) + dense_query(client, MINI_VECTOR_NAME, mv, flt, _scaled_per_query, collection) for mv in mini_queries ] for res in mini_sets: @@ -1868,7 +1970,7 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - dens = float(HYBRID_MINI_WEIGHT) * rrf(rank) + dens = float(HYBRID_MINI_WEIGHT) * _scaled_rrf(rank) score_map[pid]["d"] += dens score_map[pid]["s"] += dens except Exception: @@ -1934,7 +2036,7 @@ def _bn(p: str) -> str: }, ) # Lower weight for semantic PRF to avoid over-diversification - dens = 0.3 * prf_dw * rrf(rank) + dens = 0.3 * prf_dw * _scaled_rrf(rank) score_map[pid]["d"] += dens score_map[pid]["s"] += dens @@ -2023,14 +2125,15 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - lxs = prf_lw * rrf(rank) + lxs = prf_lw * _scaled_rrf(rank) score_map[pid]["lx"] += lxs score_map[pid]["s"] += lxs # Dense PRF pass try: embedded2 = _embed_queries_cached(_model, prf_qs) + _prf_per_query = max(12, _scaled_per_query // 2) result_sets2: List[List[Any]] = [ - dense_query(client, vec_name, v, flt, max(12, limit // 2 or 6), collection) + dense_query(client, vec_name, v, flt, _prf_per_query, collection) for v in embedded2 ] for res2 in result_sets2: @@ -2052,13 +2155,13 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - dens = prf_dw * rrf(rank) + dens = prf_dw * _scaled_rrf(rank) score_map[pid]["d"] += dens score_map[pid]["s"] += dens except Exception: pass - # Add dense scores (keep original RRF scoring) + # Add dense scores (with scaled RRF) for res in result_sets: for rank, p in enumerate(res, 1): pid = str(p.id) @@ -2078,7 +2181,7 @@ def _bn(p: str) -> str: "test": 0.0, }, ) - dens = (_AD_DENSE_W * rrf(rank)) if _USE_ADAPT else (DENSE_WEIGHT * rrf(rank)) + dens = (_AD_DENSE_W * _scaled_rrf(rank)) if _USE_ADAPT else (DENSE_WEIGHT * _scaled_rrf(rank)) score_map[pid]["d"] += dens score_map[pid]["s"] += dens @@ -2190,6 +2293,10 @@ def _bn(p: str) -> str: rec["rec"] += rec_comp rec["s"] += rec_comp + # === Large codebase score normalization === + # Spread compressed score distributions for better discrimination + _normalize_scores(score_map, _coll_size) + def _tie_key(m: Dict[str, Any]): md = (m["pt"].payload or {}).get("metadata") or {} sp = str(md.get("symbol_path") or md.get("symbol") or "") @@ -2198,7 +2305,7 @@ def _tie_key(m: Dict[str, Any]): return (-float(m["s"]), len(sp), path, start_line) if os.environ.get("DEBUG_HYBRID_SEARCH"): - logger.debug(f"score_map has {len(score_map)} items before ranking") + logger.debug(f"score_map has {len(score_map)} items before ranking (coll_size={_coll_size})") ranked = sorted(score_map.values(), key=_tie_key) if os.environ.get("DEBUG_HYBRID_SEARCH"): logger.debug(f"ranked has {len(ranked)} items after sorting") @@ -2883,6 +2990,16 @@ def _norm_under(u: str | None) -> str | None: flt = models.Filter(must=must) if must else None flt = _sanitize_filter_obj(flt) + # === Large codebase scaling (CLI path) === + _cli_coll_stats = _get_collection_stats(client, eff_collection) + _cli_coll_size = _cli_coll_stats.get("points_count", 0) + _cli_has_filters = bool(eff_language or eff_repo or eff_under or eff_kind or eff_symbol or eff_ext) + _cli_scaled_rrf_k = _scale_rrf_k(RRF_K, _cli_coll_size) + _cli_scaled_per_query = _adaptive_per_query(args.per_query, _cli_coll_size, _cli_has_filters) + + def _cli_scaled_rrf(rank: int) -> float: + return 1.0 / (_cli_scaled_rrf_k + rank) + # Build query set (optionally expanded) queries = list(clean_queries) # Initialize score map early so we can accumulate from lex and dense @@ -2890,7 +3007,7 @@ def _norm_under(u: str | None) -> str | None: # Server-side lexical vector search (hashing) as an additional ranked list try: lex_vec = lex_hash_vector(queries) - lex_results = lex_query(client, lex_vec, flt, args.per_query, eff_collection) + lex_results = lex_query(client, lex_vec, flt, _cli_scaled_per_query, eff_collection) except Exception: lex_results = [] @@ -2917,7 +3034,7 @@ def _norm_under(u: str | None) -> str | None: else: queries = expand_queries(queries, eff_language) - # Add server-side lexical vector ranking into fusion + # Add server-side lexical vector ranking into fusion (with scaled RRF) for rank, p in enumerate(lex_results, 1): pid = str(p.id) score_map.setdefault( @@ -2936,16 +3053,16 @@ def _norm_under(u: str | None) -> str | None: "test": 0.0, }, ) - lxs = (_AD_LEX_VEC_W2 * rrf(rank)) if _USE_ADAPT2 else (LEX_VECTOR_WEIGHT * rrf(rank)) + lxs = (_AD_LEX_VEC_W2 * _cli_scaled_rrf(rank)) if _USE_ADAPT2 else (LEX_VECTOR_WEIGHT * _cli_scaled_rrf(rank)) score_map[pid]["lx"] += lxs score_map[pid]["s"] += lxs embedded = _embed_queries_cached(model, queries) result_sets: List[List[Any]] = [ - dense_query(client, vec_name, v, flt, args.per_query, eff_collection) for v in embedded + dense_query(client, vec_name, v, flt, _cli_scaled_per_query, eff_collection) for v in embedded ] - # RRF fusion (weighted) + # RRF fusion (weighted, with scaled RRF) for res in result_sets: for rank, p in enumerate(res, 1): pid = str(p.id) @@ -2965,7 +3082,7 @@ def _norm_under(u: str | None) -> str | None: "test": 0.0, }, ) - dens = (_AD_DENSE_W2 * rrf(rank)) if _USE_ADAPT2 else (DENSE_WEIGHT * rrf(rank)) + dens = (_AD_DENSE_W2 * _cli_scaled_rrf(rank)) if _USE_ADAPT2 else (DENSE_WEIGHT * _cli_scaled_rrf(rank)) score_map[pid]["d"] += dens score_map[pid]["s"] += dens @@ -3046,6 +3163,9 @@ def _norm_under(u: str | None) -> str | None: rec["rec"] += rec_comp rec["s"] += rec_comp + # === Large codebase score normalization (CLI path) === + _normalize_scores(score_map, _cli_coll_size) + # Rank with deterministic tie-breakers def _tie_key(m: Dict[str, Any]): md = (m["pt"].payload or {}).get("metadata") or {}