Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ TOOL_FIND_DESCRIPTION="Search for relevant code snippets using multiple phrasing


# Optional: local cross-encoder reranker (ONNX)
# Set these to enable make rerank-local inside the container
RERANKER_ONNX_PATH=/work/models/model_qint8_avx512_vnni.onnx
RERANKER_TOKENIZER_PATH=/work/models/tokenizer.json
# Baked into Docker image at /app/models/ - no host mount needed
RERANKER_ONNX_PATH=/app/models/reranker.onnx
RERANKER_TOKENIZER_PATH=/app/models/tokenizer.json

# Reranker toggles and tuning
# Increased TOPN and RETURN_M for better final ranking
Expand Down
16 changes: 14 additions & 2 deletions Dockerfile.mcp-indexer
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,26 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
WORK_ROOTS="/work,/app"

# OS deps (git for history if we extend later)
RUN apt-get update && apt-get install -y --no-install-recommends git ca-certificates \
# OS deps (git for history if we extend later, curl for model downloads)
RUN apt-get update && apt-get install -y --no-install-recommends git ca-certificates curl \
&& rm -rf /var/lib/apt/lists/*

# Python deps: reuse shared requirements (includes FastMCP + OpenAI SDK)
COPY requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /tmp/requirements.txt

# Download reranker model and tokenizer during build
# Cross-encoder for reranking (ms-marco-MiniLM) + BGE tokenizer for micro-chunking
ARG RERANKER_ONNX_URL=https://huggingface.co/cross-encoder/ms-marco-MiniLM-L-6-v2/resolve/main/onnx/model.onnx
ARG TOKENIZER_URL=https://huggingface.co/BAAI/bge-base-en-v1.5/resolve/main/tokenizer.json
RUN mkdir -p /app/models && \
curl -L --fail --retry 3 -o /app/models/reranker.onnx "${RERANKER_ONNX_URL}" && \
curl -L --fail --retry 3 -o /app/models/tokenizer.json "${TOKENIZER_URL}"

# Set default paths for reranker (can be overridden via env)
ENV RERANKER_ONNX_PATH=/app/models/reranker.onnx \
RERANKER_TOKENIZER_PATH=/app/models/tokenizer.json

# Bake scripts into the image so entrypoints don't rely on /work
COPY scripts /app/scripts

Expand Down
158 changes: 139 additions & 19 deletions scripts/hybrid_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,90 @@ def _embed_queries_cached(
MICRO_BUDGET_TOKENS = _safe_int(os.environ.get("MICRO_BUDGET_TOKENS", "512"), 512)
MICRO_TOKENS_PER_LINE = _safe_int(os.environ.get("MICRO_TOKENS_PER_LINE", "32"), 32)

# === Large codebase scaling (default ON) ===
# These parameters enable automatic scaling for collections with 10k+ points
LARGE_COLLECTION_THRESHOLD = _safe_int(os.environ.get("HYBRID_LARGE_THRESHOLD", "10000"), 10000)
MAX_RRF_K_SCALE = _safe_float(os.environ.get("HYBRID_MAX_RRF_K_SCALE", "3.0"), 3.0)
SCORE_NORMALIZE_ENABLED = os.environ.get("HYBRID_SCORE_NORMALIZE", "1").lower() in {"1", "true", "yes", "on"}

# Cache for collection stats (avoid repeated Qdrant calls)
_COLL_STATS_CACHE: Dict[str, Tuple[float, Dict[str, Any]]] = {}
_COLL_STATS_TTL = 300 # 5 minutes

def _get_collection_stats(client: QdrantClient, coll_name: str) -> Dict[str, Any]:
"""Get cached collection statistics for scaling decisions."""
import time
now = time.time()
cached = _COLL_STATS_CACHE.get(coll_name)
if cached and (now - cached[0]) < _COLL_STATS_TTL:
return cached[1]
try:
info = client.get_collection(coll_name)
stats = {"points_count": info.points_count or 0}
_COLL_STATS_CACHE[coll_name] = (now, stats)
return stats
except Exception:
return {"points_count": 0}

def _scale_rrf_k(base_k: int, collection_size: int) -> int:
"""Scale RRF k parameter based on collection size.

For large collections, increase k to spread score distribution.
Uses logarithmic scaling: k_scaled = k * (1 + log10(size/threshold))
Capped at MAX_RRF_K_SCALE * base_k.
"""
if collection_size < LARGE_COLLECTION_THRESHOLD:
return base_k
ratio = collection_size / LARGE_COLLECTION_THRESHOLD
scale = 1.0 + math.log10(max(1, ratio))
scale = min(scale, MAX_RRF_K_SCALE)
return int(base_k * scale)

def _adaptive_per_query(base_limit: int, collection_size: int, has_filters: bool) -> int:
"""Increase candidate retrieval for larger collections.

Uses sublinear sqrt scaling to avoid excessive retrieval.
Filters reduce the need for extra candidates.
"""
if collection_size < LARGE_COLLECTION_THRESHOLD:
return base_limit
ratio = collection_size / LARGE_COLLECTION_THRESHOLD
# sqrt scaling: doubles at 4x threshold, triples at 9x
scale = math.sqrt(ratio)
if has_filters:
scale = max(1.0, scale * 0.7) # reduce scaling when filters are active
# Cap at 3x base limit
scaled = int(base_limit * min(scale, 3.0))
return max(base_limit, min(scaled, 200))

def _normalize_scores(score_map: Dict[str, Dict[str, Any]], collection_size: int) -> None:
"""Normalize scores using z-score + sigmoid for large collections.

This spreads compressed score distributions to improve discrimination.
Only applies when SCORE_NORMALIZE_ENABLED=true and collection is large.
"""
if not SCORE_NORMALIZE_ENABLED:
return
if collection_size < LARGE_COLLECTION_THRESHOLD:
return
if len(score_map) < 3:
return

scores = [rec["s"] for rec in score_map.values()]
mean_s = sum(scores) / len(scores)
var_s = sum((s - mean_s) ** 2 for s in scores) / len(scores)
std_s = math.sqrt(var_s) if var_s > 0 else 1.0

if std_s < 1e-6:
return # All scores identical, nothing to normalize

# Z-score normalization + sigmoid to [0, 1] range
for rec in score_map.values():
z = (rec["s"] - mean_s) / std_s
# Sigmoid with scale factor for wider spread
normalized = 1.0 / (1.0 + math.exp(-z * 0.5))
rec["s"] = normalized


def _merge_and_budget_spans(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Given ranked items with metadata path/start_line/end_line, merge nearby spans
Expand Down Expand Up @@ -1672,11 +1756,29 @@ def _bn(p: str) -> str:
except Exception:
pass

# Lexical vector query (keep original RRF scoring)
# === Large codebase scaling (automatic) ===
_coll_stats = _get_collection_stats(client, _collection(collection))
_coll_size = _coll_stats.get("points_count", 0)
_has_filters = bool(eff_language or eff_repo or eff_under or eff_kind or eff_symbol or eff_ext)

# Scale RRF k for better score discrimination at scale
_scaled_rrf_k = _scale_rrf_k(RRF_K, _coll_size)

# Adaptive per_query: retrieve more candidates from larger collections
_scaled_per_query = _adaptive_per_query(max(24, limit), _coll_size, _has_filters)

if os.environ.get("DEBUG_HYBRID_SEARCH") and _coll_size >= LARGE_COLLECTION_THRESHOLD:
logger.debug(f"Large collection scaling: size={_coll_size}, rrf_k={_scaled_rrf_k}, per_query={_scaled_per_query}")

# Local RRF function using scaled k
def _scaled_rrf(rank: int) -> float:
return 1.0 / (_scaled_rrf_k + rank)

# Lexical vector query (with scaled retrieval)
score_map: Dict[str, Dict[str, Any]] = {}
try:
lex_vec = lex_hash_vector(qlist)
lex_results = lex_query(client, lex_vec, flt, max(24, limit), collection)
lex_results = lex_query(client, lex_vec, flt, _scaled_per_query, collection)
except Exception:
lex_results = []

Expand Down Expand Up @@ -1708,7 +1810,7 @@ def _bn(p: str) -> str:
"test": 0.0,
},
)
lxs = (_AD_LEX_VEC_W * rrf(rank)) if _USE_ADAPT else (LEX_VECTOR_WEIGHT * rrf(rank))
lxs = (_AD_LEX_VEC_W * _scaled_rrf(rank)) if _USE_ADAPT else (LEX_VECTOR_WEIGHT * _scaled_rrf(rank))
score_map[pid]["lx"] += lxs
score_map[pid]["s"] += lxs

Expand Down Expand Up @@ -1829,7 +1931,7 @@ def _bn(p: str) -> str:
flt_gated = _sanitize_filter_obj(flt_gated)

result_sets: List[List[Any]] = [
dense_query(client, vec_name, v, flt_gated, max(24, limit), collection) for v in embedded
dense_query(client, vec_name, v, flt_gated, _scaled_per_query, collection) for v in embedded
]
if os.environ.get("DEBUG_HYBRID_SEARCH"):
total_dense_results = sum(len(rs) for rs in result_sets)
Expand All @@ -1846,7 +1948,7 @@ def _bn(p: str) -> str:
try:
mini_queries = [_project_mini(list(v), MINI_VEC_DIM) for v in embedded]
mini_sets: List[List[Any]] = [
dense_query(client, MINI_VECTOR_NAME, mv, flt, max(24, limit), collection)
dense_query(client, MINI_VECTOR_NAME, mv, flt, _scaled_per_query, collection)
for mv in mini_queries
]
for res in mini_sets:
Expand All @@ -1868,7 +1970,7 @@ def _bn(p: str) -> str:
"test": 0.0,
},
)
dens = float(HYBRID_MINI_WEIGHT) * rrf(rank)
dens = float(HYBRID_MINI_WEIGHT) * _scaled_rrf(rank)
score_map[pid]["d"] += dens
score_map[pid]["s"] += dens
except Exception:
Expand Down Expand Up @@ -1934,7 +2036,7 @@ def _bn(p: str) -> str:
},
)
# Lower weight for semantic PRF to avoid over-diversification
dens = 0.3 * prf_dw * rrf(rank)
dens = 0.3 * prf_dw * _scaled_rrf(rank)
score_map[pid]["d"] += dens
score_map[pid]["s"] += dens

Expand Down Expand Up @@ -2023,14 +2125,15 @@ def _bn(p: str) -> str:
"test": 0.0,
},
)
lxs = prf_lw * rrf(rank)
lxs = prf_lw * _scaled_rrf(rank)
score_map[pid]["lx"] += lxs
score_map[pid]["s"] += lxs
# Dense PRF pass
try:
embedded2 = _embed_queries_cached(_model, prf_qs)
_prf_per_query = max(12, _scaled_per_query // 2)
result_sets2: List[List[Any]] = [
dense_query(client, vec_name, v, flt, max(12, limit // 2 or 6), collection)
dense_query(client, vec_name, v, flt, _prf_per_query, collection)
for v in embedded2
]
for res2 in result_sets2:
Expand All @@ -2052,13 +2155,13 @@ def _bn(p: str) -> str:
"test": 0.0,
},
)
dens = prf_dw * rrf(rank)
dens = prf_dw * _scaled_rrf(rank)
score_map[pid]["d"] += dens
score_map[pid]["s"] += dens
except Exception:
pass

# Add dense scores (keep original RRF scoring)
# Add dense scores (with scaled RRF)
for res in result_sets:
for rank, p in enumerate(res, 1):
pid = str(p.id)
Expand All @@ -2078,7 +2181,7 @@ def _bn(p: str) -> str:
"test": 0.0,
},
)
dens = (_AD_DENSE_W * rrf(rank)) if _USE_ADAPT else (DENSE_WEIGHT * rrf(rank))
dens = (_AD_DENSE_W * _scaled_rrf(rank)) if _USE_ADAPT else (DENSE_WEIGHT * _scaled_rrf(rank))
score_map[pid]["d"] += dens
score_map[pid]["s"] += dens

Expand Down Expand Up @@ -2190,6 +2293,10 @@ def _bn(p: str) -> str:
rec["rec"] += rec_comp
rec["s"] += rec_comp

# === Large codebase score normalization ===
# Spread compressed score distributions for better discrimination
_normalize_scores(score_map, _coll_size)

def _tie_key(m: Dict[str, Any]):
md = (m["pt"].payload or {}).get("metadata") or {}
sp = str(md.get("symbol_path") or md.get("symbol") or "")
Expand All @@ -2198,7 +2305,7 @@ def _tie_key(m: Dict[str, Any]):
return (-float(m["s"]), len(sp), path, start_line)

if os.environ.get("DEBUG_HYBRID_SEARCH"):
logger.debug(f"score_map has {len(score_map)} items before ranking")
logger.debug(f"score_map has {len(score_map)} items before ranking (coll_size={_coll_size})")
ranked = sorted(score_map.values(), key=_tie_key)
if os.environ.get("DEBUG_HYBRID_SEARCH"):
logger.debug(f"ranked has {len(ranked)} items after sorting")
Expand Down Expand Up @@ -2883,14 +2990,24 @@ def _norm_under(u: str | None) -> str | None:
flt = models.Filter(must=must) if must else None
flt = _sanitize_filter_obj(flt)

# === Large codebase scaling (CLI path) ===
_cli_coll_stats = _get_collection_stats(client, eff_collection)
_cli_coll_size = _cli_coll_stats.get("points_count", 0)
_cli_has_filters = bool(eff_language or eff_repo or eff_under or eff_kind or eff_symbol or eff_ext)
_cli_scaled_rrf_k = _scale_rrf_k(RRF_K, _cli_coll_size)
_cli_scaled_per_query = _adaptive_per_query(args.per_query, _cli_coll_size, _cli_has_filters)

def _cli_scaled_rrf(rank: int) -> float:
return 1.0 / (_cli_scaled_rrf_k + rank)

# Build query set (optionally expanded)
queries = list(clean_queries)
# Initialize score map early so we can accumulate from lex and dense
score_map: Dict[str, Dict[str, Any]] = {}
# Server-side lexical vector search (hashing) as an additional ranked list
try:
lex_vec = lex_hash_vector(queries)
lex_results = lex_query(client, lex_vec, flt, args.per_query, eff_collection)
lex_results = lex_query(client, lex_vec, flt, _cli_scaled_per_query, eff_collection)
except Exception:
lex_results = []

Expand All @@ -2917,7 +3034,7 @@ def _norm_under(u: str | None) -> str | None:
else:
queries = expand_queries(queries, eff_language)

# Add server-side lexical vector ranking into fusion
# Add server-side lexical vector ranking into fusion (with scaled RRF)
for rank, p in enumerate(lex_results, 1):
pid = str(p.id)
score_map.setdefault(
Expand All @@ -2936,16 +3053,16 @@ def _norm_under(u: str | None) -> str | None:
"test": 0.0,
},
)
lxs = (_AD_LEX_VEC_W2 * rrf(rank)) if _USE_ADAPT2 else (LEX_VECTOR_WEIGHT * rrf(rank))
lxs = (_AD_LEX_VEC_W2 * _cli_scaled_rrf(rank)) if _USE_ADAPT2 else (LEX_VECTOR_WEIGHT * _cli_scaled_rrf(rank))
score_map[pid]["lx"] += lxs
score_map[pid]["s"] += lxs

embedded = _embed_queries_cached(model, queries)
result_sets: List[List[Any]] = [
dense_query(client, vec_name, v, flt, args.per_query, eff_collection) for v in embedded
dense_query(client, vec_name, v, flt, _cli_scaled_per_query, eff_collection) for v in embedded
]

# RRF fusion (weighted)
# RRF fusion (weighted, with scaled RRF)
for res in result_sets:
for rank, p in enumerate(res, 1):
pid = str(p.id)
Expand All @@ -2965,7 +3082,7 @@ def _norm_under(u: str | None) -> str | None:
"test": 0.0,
},
)
dens = (_AD_DENSE_W2 * rrf(rank)) if _USE_ADAPT2 else (DENSE_WEIGHT * rrf(rank))
dens = (_AD_DENSE_W2 * _cli_scaled_rrf(rank)) if _USE_ADAPT2 else (DENSE_WEIGHT * _cli_scaled_rrf(rank))
score_map[pid]["d"] += dens
score_map[pid]["s"] += dens

Expand Down Expand Up @@ -3046,6 +3163,9 @@ def _norm_under(u: str | None) -> str | None:
rec["rec"] += rec_comp
rec["s"] += rec_comp

# === Large codebase score normalization (CLI path) ===
_normalize_scores(score_map, _cli_coll_size)

# Rank with deterministic tie-breakers
def _tie_key(m: Dict[str, Any]):
md = (m["pt"].payload or {}).get("metadata") or {}
Expand Down
Loading