From 728b78975944b6bf17eef63ddf3b2eb5eda19ca8 Mon Sep 17 00:00:00 2001 From: John Donalson <11264689+m1rl0k@users.noreply.github.com> Date: Sat, 1 Nov 2025 11:40:36 -0400 Subject: [PATCH 1/2] more --- README.md | 24 +- scripts/mcp_indexer_server.py | 106 ++++++++- scripts/mcp_router.py | 357 +++++++++++++++++++++++++++- tests/test_router_batching.py | 146 ++++++++++++ tests/test_router_batching_demux.py | 103 ++++++++ 5 files changed, 733 insertions(+), 3 deletions(-) create mode 100644 tests/test_router_batching.py create mode 100644 tests/test_router_batching_demux.py diff --git a/README.md b/README.md index 1915e823..c58faafd 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,7 @@ Notes: - Roo (SSE/RMCP): supports both SSE and RMCP connections; see config examples below - Cline (SSE/RMCP): supports both SSE and RMCP connections; see config examples below - Windsurf (SSE/RMCP): supports both SSE and RMCP connections; see config examples below +- Zed (SSE): uses mcp-remote bridge via command/args; see config below - Kiro (SSE): uses mcp-remote bridge via command/args; see config below - Qodo (RMCP): connects directly to HTTP endpoints; add each tool individually - OpenAI Codex (RMCP): TOML config for memory/indexer URLs @@ -270,8 +271,29 @@ Create `.kiro/settings/mcp.json` in your workspace: } } ```` + +Zed (SSE): +Add to your Zed `settings.json` (accessed via Command Palette → "Settings: Open Settings (JSON)"): +````json +{ + "mcp": { + "servers": { + "qdrant-indexer": { + "command": "npx", + "args": [ + "mcp-remote", + "http://localhost:8001/sse", + "--transport", + "sse-only" + ], + "env": {} + } + } + } +} +```` Notes: -- Kiro expects command/args (stdio). mcp-remote bridges to remote SSE endpoints. +- Zed expects command/args (stdio). mcp-remote bridges to remote SSE endpoints. - If npx prompts, add -y right after npx. - For Qodo (RMCP) clients, see "Qodo Integration (RMCP config)" below for the direct `url`-based snippet. diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index f0959eca..5fe5644e 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -5489,12 +5489,116 @@ def _k(s: Dict[str, Any]): except Exception: pass - return { + # Optional: provide per-query answers/citations for pack mode to enable exact router demux + answers_by_query = None + try: + if len(queries) > 1 and str(_cfg.get("mode") or "").strip().lower() == "pack": + answers_by_query = [] + for q in queries: + try: + # Per-query retrieval + _retr_i = _ca_prepare_filters_and_retrieve( + queries=[q], + lim=lim, + ppath=ppath, + filters=_cfg["filters"], + model=model, + did_local_expand=did_local_expand, + kwargs={ + "language": _cfg["filters"].get("language"), + "under": _cfg["filters"].get("under"), + "path_glob": _cfg["filters"].get("path_glob"), + "not_glob": _cfg["filters"].get("not_glob"), + "path_regex": _cfg["filters"].get("path_regex"), + "ext": _cfg["filters"].get("ext"), + "kind": _cfg["filters"].get("kind"), + "case": _cfg["filters"].get("case"), + "symbol": _cfg["filters"].get("symbol"), + }, + ) + items_i = _retr_i["items"] + eff_language_i = _retr_i["eff_language"] + eff_path_glob_i = _retr_i["eff_path_glob"] + eff_not_glob_i = _retr_i["eff_not_glob"] + override_under_i = _retr_i["override_under"] + sym_arg_i = _retr_i["sym_arg"] + cwd_root_i = _retr_i["cwd_root"] + path_regex_i = _retr_i["path_regex"] + ext_i = _retr_i["ext"] + kind_i = _retr_i["kind"] + case_i = _retr_i["case"] + + fallback_kwargs_i = dict(kwargs or {}) + for key in ("path_glob", "language", "under"): + fallback_kwargs_i.pop(key, None) + + spans_i = _ca_fallback_and_budget( + items=items_i, + queries=[q], + lim=lim, + ppath=ppath, + eff_language=eff_language_i, + eff_path_glob=eff_path_glob_i, + eff_not_glob=eff_not_glob_i, + path_regex=path_regex_i, + sym_arg=sym_arg_i, + ext=ext_i, + kind=kind_i, + override_under=override_under_i, + did_local_expand=did_local_expand, + model=model, + req_language=eff_language_i, + not_=not_, + case=case_i, + cwd_root=cwd_root_i, + include_snippet=bool(include_snippet), + kwargs=fallback_kwargs_i, + ) + + cits_i, ctx_blocks_i, snippets_by_id_i, asked_ident_i, def_line_i, def_id_i, usage_id_i = _ca_build_citations_and_context( + spans=spans_i, + include_snippet=bool(include_snippet), + queries=[q], + ) + + # Decode per-query + prompt_i = _ca_build_prompt(ctx_blocks_i, cits_i, [q]) + ans_raw_i = _ca_decode(prompt_i, mtok=mtok, temp=temp, top_k=top_k, top_p=top_p, stops=stops) + ans_i = _ca_postprocess_answer( + ans_raw_i, + cits_i, + asked_ident=asked_ident_i, + def_line_exact=def_line_i, + def_id=def_id_i, + usage_id=usage_id_i, + snippets_by_id=snippets_by_id_i, + ) + + answers_by_query.append({ + "query": q, + "answer": ans_i, + "citations": cits_i, + }) + except Exception as _e: + # Best-effort: if per-query path fails, include empty stub to preserve alignment + answers_by_query.append({ + "query": q, + "answer": "", + "citations": [], + "error": str(_e), + }) + except Exception: + answers_by_query = None + + out = { "answer": answer.strip(), "citations": citations, "query": queries, "used": {"gate_first": True, "refrag": True}, } + if answers_by_query: + out["answers_by_query"] = answers_by_query + return out @mcp.tool() diff --git a/scripts/mcp_router.py b/scripts/mcp_router.py index ba635f53..319a4420 100644 --- a/scripts/mcp_router.py +++ b/scripts/mcp_router.py @@ -729,6 +729,358 @@ def _post_raw_retry(url: str, payload: Dict[str, Any], headers: Dict[str, str], raise last_exc + +# ----------------------------- +# Context Answer Batching (per-agent, per-filter) +# ----------------------------- +import threading + +class BatchingContextAnswerClient: + """Lightweight in-memory batching for context_answer calls. + + - Queues short-lived requests keyed by (base_url, collection, filters_fingerprint) + - Flushes after a small window or when batch size cap is hit + - For multi-item batches, sends query=[...] with mode="pack" + - Shares the same response with all enqueued requests + """ + + def __init__(self, call_func=None, enable: bool | None = None, window_ms: int | None = None, + max_batch: int | None = None, budget_ms: int | None = None): + self._call = call_func or call_tool_http + # Enabled flag (default off for back-compat) + if enable is None: + env_enabled = os.environ.get("ROUTER_BATCH_ENABLED") + if env_enabled is None: + env_enabled = os.environ.get("ROUTER_BATCH_ENABLE", "0") + self.enabled = str(env_enabled).strip().lower() in {"1","true","yes","on"} + else: + self.enabled = bool(enable) + # Window and caps + self.window_ms = int(os.environ.get("ROUTER_BATCH_WINDOW_MS", str(window_ms if window_ms is not None else 100)) or 100) + env_max = os.environ.get("ROUTER_BATCH_MAX_SIZE") + if env_max is None: + env_max = os.environ.get("ROUTER_BATCH_MAX") + self.max_batch = int(env_max or (max_batch if max_batch is not None else 8)) + env_budget = os.environ.get("ROUTER_BATCH_LATENCY_BUDGET_MS") + if env_budget is None: + env_budget = os.environ.get("ROUTER_BATCH_BUDGET_MS") + self.budget_ms = int(env_budget or (budget_ms if budget_ms is not None else 2000)) + self._lock = threading.RLock() + self._groups: dict[str, dict[str, any]] = {} + + def _should_bypass(self, args: Dict[str, Any]) -> bool: + # 1) explicit flag in args + try: + if isinstance(args, dict): + v = args.get("immediate") + if v is not None and str(v).strip().lower() in {"1","true","yes","on"}: + return True + except Exception: + pass + # 2) env-level bypass + if str(os.environ.get("ROUTER_BATCH_BYPASS", "0")).strip().lower() in {"1","true","yes","on"}: + return True + # 3) free-text hint in the query + try: + q = str((args or {}).get("query") or "") + if "immediate answer" in q.lower(): + return True + except Exception: + pass + return False + + def _norm_query(self, q: str) -> str: + try: + return re.sub(r"\s+", " ", str(q or "").strip()) + except Exception: + return str(q) + + def _filters_fingerprint(self, args: Dict[str, Any]) -> str: + keep = { + "collection", "language", "under", "kind", "symbol", "ext", + "path_regex", "path_glob", "not_glob", "not_", "case", + "limit", "per_path", "include_snippet", + } + try: + filt = {k: args.get(k) for k in keep if k in args} + # Normalize list-like values + def _norm(v): + if v is None: + return None + if isinstance(v, (list, tuple)): + return [str(x) for x in v] + return v + clean = {k: _norm(v) for k, v in filt.items()} + return json.dumps(clean, sort_keys=True, ensure_ascii=False) + except Exception: + return "{}" + + def _group_key(self, base_url: str, args: Dict[str, Any]) -> str: + coll = str(args.get("collection") or "") + fp = self._filters_fingerprint(args) + repo = os.getcwd() + return f"{base_url}|{coll}|answer|{fp}|{repo}" + + def call_or_enqueue(self, base_url: str, tool: str, args: Dict[str, Any], timeout: float = 120.0) -> Dict[str, Any]: + # Passthrough conditions + if not self.enabled: + return self._call(base_url, tool, args, timeout=timeout) + if self._should_bypass(args): + return self._call(base_url, tool, args, timeout=timeout) + + start_ts = time.time() + key = self._group_key(base_url, args or {}) + norm_q = self._norm_query((args or {}).get("query") or "") + ev = threading.Event() + slot = {"event": ev, "result": None, "error": None, "query": norm_q, "args": dict(args or {})} + + with self._lock: + g = self._groups.get(key) + if not g: + g = { + "created": time.time(), + "items": [], # list of slots + "timer": None, + } + self._groups[key] = g + # Dedup within the window: if same normalized query already present, just add another waiter + g["items"].append(slot) + # Start timer if not set + if g["timer"] is None: + delay = max(0.0, float(self.window_ms) / 1000.0) + t = threading.Timer(delay, self._flush, args=(key,)) + g["timer"] = t + t.daemon = True + t.start() + # Flush early if cap reached + if len(g["items"]) >= self.max_batch: + t = g.get("timer") + if t: + try: + t.cancel() + except Exception: + pass + g["timer"] = None + # flush outside lock + threading.Thread(target=self._flush, args=(key,), daemon=True).start() + + # Wait with a hard budget + remain = max(0.05, self.budget_ms / 1000.0) + ev.wait(timeout=min(timeout, remain)) + # If not signaled yet (e.g., server slow), fall back to direct call + if not ev.is_set(): + try: + res = self._call(base_url, tool, args, timeout=timeout) + slot["result"] = res + ev.set() + # Remove this slot from the batch group to prevent duplicate flush calls + try: + with self._lock: + gg = self._groups.get(key) + if gg: + lst = gg.get("items") or [] + if slot in lst: + try: + lst.remove(slot) + except Exception: + pass + if not lst: + # If group is empty, cancel timer and cleanup + t2 = gg.get("timer") + if t2: + try: + t2.cancel() + except Exception: + pass + self._groups.pop(key, None) + except Exception: + pass + # Metrics: bypass due to budget + try: + print(json.dumps({"router": {"batch_fallback": True, "elapsed_ms": int((time.time()-start_ts)*1000)}}), file=sys.stderr) + except Exception: + pass + return res + except Exception as e: + slot["error"] = e + ev.set() + raise + + # Return shared result + if slot.get("error") is not None: + raise slot["error"] + return slot.get("result") or {} + + def _flush(self, key: str) -> None: + with self._lock: + g = self._groups.get(key) + if not g: + return + items = g.get("items") or [] + g["items"] = [] + g["timer"] = None + if not items: + self._groups.pop(key, None) + return + # Build queries and merged args + unique_q: list[str] = [] + seen_q = set() + for it in items: + q = it.get("query") or "" + if q not in seen_q: + seen_q.add(q) + unique_q.append(q) + first_args = dict(items[0].get("args") or {}) + forward = {k: v for k, v in first_args.items() if k not in {"query", "queries"}} + base_url = None + try: + # Rebuild base_url from key prefix + base_url = key.split("|")[0] + except Exception: + base_url = HTTP_URL_INDEXER + + started = time.time() + results_by_q: Dict[str, Any] = {} + errors_by_q: Dict[str, Exception] = {} + calls = 0 + try: + import copy as _copy + except Exception: + _copy = None # type: ignore + + # Single aggregated call when multiple unique queries + if len(unique_q) > 1: + args_all = dict(forward) + args_all["query"] = list(unique_q) + args_all["mode"] = args_all.get("mode") or "pack" + try: + agg_res = self._call(base_url, "context_answer", args_all, timeout=120.0) + calls = 1 + # Demultiplex: carve per-query replies + try: + payload = ((agg_res or {}).get("result") or {}).get("structuredContent") or {} + body = (payload.get("result") or {}) + ans = str(body.get("answer") or "") + cits = body.get("citations") or [] + except Exception: + payload, body, ans, cits = {}, {}, "", [] + + # Prefer exact demux from indexer if available + abq = None + try: + abq = body.get("answers_by_query") + except Exception: + abq = None + if isinstance(abq, list) and abq: + _map: Dict[str, Any] = {} + # Build by query string when present, else by index + by_idx = (len(abq) >= len(unique_q)) + for i, entry in enumerate(abq): + try: + qv = entry.get("query") + qk = None + if isinstance(qv, list) and qv: + qk = str(qv[0]) + elif isinstance(qv, str): + qk = qv + except Exception: + qk = None + # Fallback to index if query key not present + key = qk if qk else (unique_q[i] if by_idx and i < len(unique_q) else None) + if not key: + continue + # Build per result by overriding answer/citations/query + per = _copy.deepcopy(agg_res) if _copy else json.loads(json.dumps(agg_res)) + try: + per_body = (per.get("result") or {}).get("structuredContent", {}).get("result", {}) + except Exception: + per_body = None + try: + ans_i = str(entry.get("answer") or "") + cits_i = entry.get("citations") or [] + if per_body is not None: + per_body["answer"] = ans_i + per_body["citations"] = cits_i + per_body["query"] = [key] + else: + if "result" in per and isinstance(per["result"], dict): + sc = per["result"].get("structuredContent") or {} + if "result" in sc and isinstance(sc["result"], dict): + sc["result"]["answer"] = ans_i + sc["result"]["citations"] = cits_i + sc["result"]["query"] = [key] + except Exception: + pass + _map[str(key)] = per + for uq in unique_q: + if str(uq) in _map: + results_by_q[uq] = _map[str(uq)] + # Fill missing via heuristics below if necessary + remaining = [uq for uq in unique_q if uq not in results_by_q] + else: + remaining = list(unique_q) + + # No structured per-query answers from server: fall back to one call per query for correctness + if remaining: + for uq in remaining: + args_i = dict(forward) + args_i["query"] = uq + try: + results_by_q[uq] = self._call(base_url, "context_answer", args_i, timeout=120.0) + except Exception as e: + errors_by_q[uq] = e + calls += len(remaining) + except Exception as e: + # If aggregated call fails, assign error to every query + for uq in unique_q: + errors_by_q[uq] = e + calls = 1 + else: + # Single query passthrough + args1 = dict(forward) + args1["query"] = unique_q[0] if unique_q else "" + try: + results_by_q[args1["query"]] = self._call(base_url, "context_answer", args1, timeout=120.0) + except Exception as e: + errors_by_q[args1["query"]] = e + calls = 1 + + elapsed_ms = int((time.time() - started) * 1000) + try: + print(json.dumps({ + "router": { + "batch_flushed": True, + "n_items": len(items), + "unique_q": len(unique_q), + "calls": int(calls), + "elapsed_ms": elapsed_ms, + "ok": (len(errors_by_q) == 0), + } + }), file=sys.stderr) + except Exception: + pass + + # Fan out per-query results/errors to all waiters + for it in items: + q = it.get("query") or "" + it["result"] = results_by_q.get(q) + it["error"] = errors_by_q.get(q) + ev = it.get("event") + try: + if hasattr(ev, "set"): + ev.set() + except Exception: + pass + with self._lock: + # Cleanup empty group + gg = self._groups.get(key) + if gg and not gg.get("items"): + self._groups.pop(key, None) + + +# Global client +_BATCH_CLIENT = BatchingContextAnswerClient() + def _mcp_tools_list(base_url: str, timeout: float = 30.0) -> List[str]: try: headers = _mcp_handshake(base_url, timeout=min(timeout, 15.0)) @@ -1344,7 +1696,10 @@ def main(argv: List[str]) -> int: except Exception: pass try: - res = call_tool_http(base_url, tool, targs, timeout=args.timeout) + if tool in {"context_answer", "context_answer_compat"}: + res = _BATCH_CLIENT.call_or_enqueue(base_url, tool, targs, timeout=args.timeout) + else: + res = call_tool_http(base_url, tool, targs, timeout=args.timeout) print(json.dumps({"tool": tool, "result": res}, indent=2)) last = res # If this was a memory.find step, capture snippets for later augmentation diff --git a/tests/test_router_batching.py b/tests/test_router_batching.py new file mode 100644 index 00000000..6795caec --- /dev/null +++ b/tests/test_router_batching.py @@ -0,0 +1,146 @@ +import threading +import time + +import pytest + +from scripts.mcp_router import BatchingContextAnswerClient + + +class _Counter: + def __init__(self): + self.n = 0 + self.lock = threading.Lock() + + def inc(self): + with self.lock: + self.n += 1 + return self.n + + +def _fake_call_factory(counter: _Counter): + def _fake_call(base_url: str, tool: str, args: dict, timeout: float = 1.0): + # Simulate a tiny network call and count invocations + counter.inc() + time.sleep(0.01) + q = args.get("query") + queries = args.get("queries") or ([q] if q else ([] if q is None else ([q] if not isinstance(q, list) else q))) + # When multiple queries are provided (aggregated call), return structured per-query answers + answers_by_query = None + if isinstance(q, list) and len(q) > 1: + answers_by_query = [ + {"query": str(qi), "answer": "ok", "citations": []} for qi in q + ] + return { + "result": { + "structuredContent": { + "result": { + "answer": "ok", + "citations": [], + "query": queries, + **({"answers_by_query": answers_by_query} if answers_by_query else {}), + } + } + } + } + + return _fake_call + + +def test_batching_merges_identical_queries(): + counter = _Counter() + client = BatchingContextAnswerClient( + call_func=_fake_call_factory(counter), + enable=True, + window_ms=120, + max_batch=8, + budget_ms=2000, + ) + + results: list[dict] = [] + barrier = threading.Barrier(3) + + def worker(): + barrier.wait() + res = client.call_or_enqueue( + "http://localhost:8003/mcp", + "context_answer", + {"query": "What is batching?", "limit": 5}, + timeout=1.0, + ) + results.append(res) + + t1 = threading.Thread(target=worker) + t2 = threading.Thread(target=worker) + t1.start(); t2.start() + barrier.wait() # release both workers + t1.join(); t2.join() + + # Exactly one underlying call, two client results + assert counter.n == 1 + assert len(results) == 2 + for r in results: + assert r.get("result", {}).get("structuredContent", {}).get("result", {}).get("answer") == "ok" + + +def test_batching_cap_flushes_early(): + counter = _Counter() + client = BatchingContextAnswerClient( + call_func=_fake_call_factory(counter), + enable=True, + window_ms=5000, # long window, but cap will force immediate flush + max_batch=2, + budget_ms=2000, + ) + + results: list[dict] = [] + barrier = threading.Barrier(3) + + def worker(q): + barrier.wait() + res = client.call_or_enqueue( + "http://localhost:8003/mcp", + "context_answer", + {"query": q, "limit": 5}, + timeout=1.0, + ) + results.append(res) + + t1 = threading.Thread(target=worker, args=("A",)) + t2 = threading.Thread(target=worker, args=("B",)) + t1.start(); t2.start() + barrier.wait() + t1.join(); t2.join() + + # Cap reached: we flush once and make a single aggregated call + assert counter.n == 1 + assert len(results) == 2 + + +def test_bypass_immediate_flag_calls_direct(): + counter = _Counter() + client = BatchingContextAnswerClient( + call_func=_fake_call_factory(counter), + enable=True, + window_ms=200, + max_batch=8, + budget_ms=2000, + ) + + # Two direct calls because of immediate flag; they should not be batched + r1 = client.call_or_enqueue( + "http://localhost:8003/mcp", + "context_answer", + {"query": "Q1", "limit": 5, "immediate": True}, + timeout=1.0, + ) + r2 = client.call_or_enqueue( + "http://localhost:8003/mcp", + "context_answer", + {"query": "Q2", "limit": 5, "immediate": True}, + timeout=1.0, + ) + + assert counter.n == 2 + assert r1.get("result", {}).get("structuredContent", {}).get("result", {}).get("answer") == "ok" + assert r2.get("result", {}).get("structuredContent", {}).get("result", {}).get("answer") == "ok" + diff --git a/tests/test_router_batching_demux.py b/tests/test_router_batching_demux.py new file mode 100644 index 00000000..7cae1ede --- /dev/null +++ b/tests/test_router_batching_demux.py @@ -0,0 +1,103 @@ +import threading +import time + +from scripts.mcp_router import BatchingContextAnswerClient + + +class _Counter: + def __init__(self): + self.n = 0 + self.lock = threading.Lock() + + def inc(self): + with self.lock: + self.n += 1 + return self.n + + +def _fake_call_factory(counter: _Counter): + def _fake_call(base_url: str, tool: str, args: dict, timeout: float = 1.0): + counter.inc() + time.sleep(0.01) + q = args.get("query") + queries = args.get("queries") or ([q] if q else ([] if q is None else ([q] if not isinstance(q, list) else q))) + answers_by_query = None + if isinstance(q, list) and len(q) > 1: + answers_by_query = [ + {"query": str(qi), "answer": f"ok:{qi}", "citations": []} for qi in q + ] + return { + "result": { + "structuredContent": { + "result": { + "answer": f"ok:{q}", + "citations": [], + "query": queries, + **({"answers_by_query": answers_by_query} if answers_by_query else {}), + } + } + } + } + + return _fake_call + + +def test_demultiplex_different_queries_results_are_isolated(): + counter = _Counter() + client = BatchingContextAnswerClient( + call_func=_fake_call_factory(counter), + enable=True, + window_ms=120, + max_batch=8, + budget_ms=2000, + ) + + results: list[tuple[str, dict]] = [] + barrier = threading.Barrier(3) + + def worker(q: str): + barrier.wait() + res = client.call_or_enqueue( + "http://localhost:8003/mcp", + "context_answer", + {"query": q, "limit": 5}, + timeout=1.0, + ) + results.append((q, res)) + + t1 = threading.Thread(target=worker, args=("Q1",)) + t2 = threading.Thread(target=worker, args=("Q2",)) + t1.start(); t2.start() + barrier.wait() + t1.join(); t2.join() + + # Aggregated call once, demux per-query reply + assert counter.n == 1 + assert len(results) == 2 + for q, r in results: + rq = r.get("result", {}).get("structuredContent", {}).get("result", {}).get("query") + # Each result should reflect only its own query + assert rq == [q] + + +def test_budget_fallback_does_not_double_call(): + counter = _Counter() + client = BatchingContextAnswerClient( + call_func=_fake_call_factory(counter), + enable=True, + window_ms=500, # long window so timer would fire later + max_batch=8, + budget_ms=10, # tiny budget to force immediate fallback + ) + + res = client.call_or_enqueue( + "http://localhost:8003/mcp", + "context_answer", + {"query": "late", "limit": 5}, + timeout=1.0, + ) + assert res + # Wait beyond the window; if slot was not removed, we'd see a second call when timer flushes + time.sleep(0.6) + assert counter.n == 1 + From 9761908d04e48f1f66b04171101c37d41618ba60 Mon Sep 17 00:00:00 2001 From: John Donalson <11264689+m1rl0k@users.noreply.github.com> Date: Sat, 1 Nov 2025 11:59:18 -0400 Subject: [PATCH 2/2] fixes --- README.md | 40 ++++++++----- scripts/mcp_indexer_server.py | 107 ++++++++++++---------------------- 2 files changed, 62 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index c58faafd..1de0f3b9 100644 --- a/README.md +++ b/README.md @@ -276,25 +276,35 @@ Zed (SSE): Add to your Zed `settings.json` (accessed via Command Palette → "Settings: Open Settings (JSON)"): ````json { - "mcp": { - "servers": { - "qdrant-indexer": { - "command": "npx", - "args": [ - "mcp-remote", - "http://localhost:8001/sse", - "--transport", - "sse-only" - ], - "env": {} - } - } + /// The name of your MCP server + "qdrant-indexer": { + /// The command which runs the MCP server + "command": "npx", + /// The arguments to pass to the MCP server + "args": [ + "mcp-remote", + "http://localhost:8001/sse", + "--transport", + "sse-only" + ], + /// The environment variables to set + "env": {} } } ```` Notes: -- Zed expects command/args (stdio). mcp-remote bridges to remote SSE endpoints. -- If npx prompts, add -y right after npx. +- Zed expects MCP servers at the root level of settings.json +- Uses command/args (stdio). mcp-remote bridges to remote SSE endpoints +- If npx prompts, add `-y` right after npx: `"command": "npx", "args": ["-y", "mcp-remote", ...]` +- Alternative: Use direct HTTP connection if mcp-remote has issues: + ```json + { + "qdrant-indexer": { + "type": "http", + "url": "http://localhost:8001/sse" + } + } + ``` - For Qodo (RMCP) clients, see "Qodo Integration (RMCP config)" below for the direct `url`-based snippet. 6) Common troubleshooting diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 5fe5644e..bf251dcc 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -5489,89 +5489,57 @@ def _k(s: Dict[str, Any]): except Exception: pass - # Optional: provide per-query answers/citations for pack mode to enable exact router demux + # Optional: provide per-query answers/citations for pack mode by reusing the combined retrieval answers_by_query = None try: if len(queries) > 1 and str(_cfg.get("mode") or "").strip().lower() == "pack": + import re as _re + + def _tok2(s: str) -> list[str]: + try: + return [w.lower() for w in _re.split(r"[^A-Za-z0-9_]+", str(s or "")) if len(w) >= 3] + except Exception: + return [] + + # Build quick lookups from the combined retrieval we already computed + id_to_cit = {int(c.get("id") or 0): c for c in (citations or []) if int(c.get("id") or 0) > 0} + id_to_block = {idx + 1: blk for idx, blk in enumerate(context_blocks or [])} + answers_by_query = [] for q in queries: try: - # Per-query retrieval - _retr_i = _ca_prepare_filters_and_retrieve( - queries=[q], - lim=lim, - ppath=ppath, - filters=_cfg["filters"], - model=model, - did_local_expand=did_local_expand, - kwargs={ - "language": _cfg["filters"].get("language"), - "under": _cfg["filters"].get("under"), - "path_glob": _cfg["filters"].get("path_glob"), - "not_glob": _cfg["filters"].get("not_glob"), - "path_regex": _cfg["filters"].get("path_regex"), - "ext": _cfg["filters"].get("ext"), - "kind": _cfg["filters"].get("kind"), - "case": _cfg["filters"].get("case"), - "symbol": _cfg["filters"].get("symbol"), - }, - ) - items_i = _retr_i["items"] - eff_language_i = _retr_i["eff_language"] - eff_path_glob_i = _retr_i["eff_path_glob"] - eff_not_glob_i = _retr_i["eff_not_glob"] - override_under_i = _retr_i["override_under"] - sym_arg_i = _retr_i["sym_arg"] - cwd_root_i = _retr_i["cwd_root"] - path_regex_i = _retr_i["path_regex"] - ext_i = _retr_i["ext"] - kind_i = _retr_i["kind"] - case_i = _retr_i["case"] - - fallback_kwargs_i = dict(kwargs or {}) - for key in ("path_glob", "language", "under"): - fallback_kwargs_i.pop(key, None) - - spans_i = _ca_fallback_and_budget( - items=items_i, - queries=[q], - lim=lim, - ppath=ppath, - eff_language=eff_language_i, - eff_path_glob=eff_path_glob_i, - eff_not_glob=eff_not_glob_i, - path_regex=path_regex_i, - sym_arg=sym_arg_i, - ext=ext_i, - kind=kind_i, - override_under=override_under_i, - did_local_expand=did_local_expand, - model=model, - req_language=eff_language_i, - not_=not_, - case=case_i, - cwd_root=cwd_root_i, - include_snippet=bool(include_snippet), - kwargs=fallback_kwargs_i, - ) + toks = set(_tok2(q)) + picked_ids: list[int] = [] + if toks: + for cid, c in id_to_cit.items(): + path_l = str(c.get("path") or "").lower() + sn = (snippets_by_id.get(cid) or "").lower() + if any(t in sn or t in path_l for t in toks): + picked_ids.append(cid) + if len(picked_ids) >= 6: # small cap per query to keep prompt compact + break + # Fallback if nothing matched: take the first 2 citations + if not picked_ids: + picked_ids = [c.get("id") for c in (citations or [])[:2] if c.get("id")] - cits_i, ctx_blocks_i, snippets_by_id_i, asked_ident_i, def_line_i, def_id_i, usage_id_i = _ca_build_citations_and_context( - spans=spans_i, - include_snippet=bool(include_snippet), - queries=[q], - ) + # Assemble per-query citations and context blocks using the shared retrieval + cits_i = [id_to_cit[cid] for cid in picked_ids if cid in id_to_cit] + ctx_blocks_i = [id_to_block[cid] for cid in picked_ids if cid in id_to_block] - # Decode per-query + # Decode per-query with the subset of shared context prompt_i = _ca_build_prompt(ctx_blocks_i, cits_i, [q]) ans_raw_i = _ca_decode(prompt_i, mtok=mtok, temp=temp, top_k=top_k, top_p=top_p, stops=stops) + + # Minimal post-processing with per-query identifier inference + asked_ident_i = _primary_identifier_from_queries([q]) ans_i = _ca_postprocess_answer( ans_raw_i, cits_i, asked_ident=asked_ident_i, - def_line_exact=def_line_i, - def_id=def_id_i, - usage_id=usage_id_i, - snippets_by_id=snippets_by_id_i, + def_line_exact=None, + def_id=None, + usage_id=None, + snippets_by_id={cid: snippets_by_id.get(cid, "") for cid in picked_ids}, ) answers_by_query.append({ @@ -5580,7 +5548,6 @@ def _k(s: Dict[str, Any]): "citations": cits_i, }) except Exception as _e: - # Best-effort: if per-query path fails, include empty stub to preserve alignment answers_by_query.append({ "query": q, "answer": "",