From f81f186c012a33cfd4a7320ae90a08b8f2160aca Mon Sep 17 00:00:00 2001 From: john donalson Date: Mon, 26 Jan 2026 04:27:36 -0500 Subject: [PATCH 1/7] Add gc.collect to free memory after file indexing Invoke garbage collection after processing each file in index_repo to prevent memory accumulation. Also update FakeClient in tests to support on_disk_payload parameter in create_collection. --- scripts/ingest/pipeline.py | 4 ++++ tests/test_ingest_schema_mode.py | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/scripts/ingest/pipeline.py b/scripts/ingest/pipeline.py index 0fe9e893..a99675e8 100644 --- a/scripts/ingest/pipeline.py +++ b/scripts/ingest/pipeline.py @@ -1508,6 +1508,7 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: Returns: (path, error_or_none, was_skipped_due_to_lock) """ + import gc per_file_repo = ( root_repo_for_cache if root_repo_for_cache is not None @@ -1532,6 +1533,9 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: return (file_path, None, True) except Exception as e: return (file_path, e, False) + finally: + # Free memory after each file to prevent accumulation + gc.collect() files_processed = 0 errors = [] diff --git a/tests/test_ingest_schema_mode.py b/tests/test_ingest_schema_mode.py index c766089b..593dce40 100644 --- a/tests/test_ingest_schema_mode.py +++ b/tests/test_ingest_schema_mode.py @@ -30,7 +30,7 @@ def get_collection(self, name): payload_schema=self.payload_schema, ) - def create_collection(self, collection_name, vectors_config, sparse_vectors_config=None, hnsw_config=None, quantization_config=None): + def create_collection(self, collection_name, vectors_config, sparse_vectors_config=None, hnsw_config=None, quantization_config=None, on_disk_payload=None): self.create_calls.append( { "collection_name": collection_name, @@ -38,6 +38,7 @@ def create_collection(self, collection_name, vectors_config, sparse_vectors_conf "sparse_vectors_config": sparse_vectors_config, "hnsw_config": hnsw_config, "quantization_config": quantization_config, + "on_disk_payload": on_disk_payload, } ) self.collection_exists = True From 41a83da307f8b5719d4d56fe6de1f82ef4e0d935 Mon Sep 17 00:00:00 2001 From: john donalson Date: Mon, 26 Jan 2026 05:04:41 -0500 Subject: [PATCH 2/7] Add shared embedding service with remote provider support Introduces a new embedding_service with Dockerfile, FastAPI server, and requirements for serving ONNX-based embeddings as a shared service. Updates docker-compose.yml to include the embedding service and its cache volume. Modifies reset.py to always include the embedding service in container management and updates help text. Enhances qdrant.py to support both local and remote embedding providers, with concurrency control and HTTP fallback. Adds an empty __init__.py for embedding_service package initialization. --- docker-compose.yml | 36 +++++++++ embedding_service/Dockerfile | 19 +++++ embedding_service/main.py | 103 +++++++++++++++++++++++++ embedding_service/requirements.txt | 5 ++ scripts/ctx_cli/commands/reset.py | 19 ++--- scripts/ingest/qdrant.py | 70 +++++++++++++---- services/embedding_service/__init__.py | 2 + 7 files changed, 232 insertions(+), 22 deletions(-) create mode 100644 embedding_service/Dockerfile create mode 100644 embedding_service/main.py create mode 100644 embedding_service/requirements.txt create mode 100644 services/embedding_service/__init__.py diff --git a/docker-compose.yml b/docker-compose.yml index e3513da5..c4886f61 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,38 @@ services: networks: - dev-remote-network + # Shared Embedding Service - single ONNX model serves all indexers + # Memory: ~1-2 GB (vs 15 GB with per-indexer models) + # Enable with EMBEDDING_PROVIDER=remote in .env + embedding: + build: + context: ./embedding_service + dockerfile: Dockerfile + container_name: embedding-service + ports: + - "8100:8100" + environment: + - EMBEDDING_MODEL=${EMBEDDING_MODEL:-BAAI/bge-base-en-v1.5} + - EMBED_MAX_CONCURRENT=${EMBED_MAX_CONCURRENT:-2} + - EMBED_MAX_BATCH=${EMBED_MAX_BATCH:-256} + - HF_HOME=/tmp/huggingface + - FASTEMBED_CACHE_PATH=/tmp/fastembed + volumes: + - embedding_cache:/tmp/huggingface + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8100/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + deploy: + resources: + limits: + memory: 4G + restart: unless-stopped + networks: + - dev-remote-network + # Qdrant vector database - same as base compose qdrant: image: qdrant/qdrant:latest @@ -744,6 +776,10 @@ volumes: redis_data: driver: local + # Embedding service model cache + embedding_cache: + driver: local + # Custom network for service discovery networks: dev-remote-network: diff --git a/embedding_service/Dockerfile b/embedding_service/Dockerfile new file mode 100644 index 00000000..97c70b43 --- /dev/null +++ b/embedding_service/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy service +COPY main.py . + +# Pre-download default model during build (optional, speeds up startup) +# RUN python -c "from fastembed import TextEmbedding; TextEmbedding('BAAI/bge-base-en-v1.5')" + +ENV EMBED_SERVICE_PORT=8100 +EXPOSE 8100 + +CMD ["python", "main.py"] + diff --git a/embedding_service/main.py b/embedding_service/main.py new file mode 100644 index 00000000..47da509f --- /dev/null +++ b/embedding_service/main.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +""" +Shared Embedding Service - One model instance serves all indexers. + +Endpoints: + POST /embed - Embed texts, returns vectors + GET /health - Health check for k8s probes + +Memory: ~1-2 GB (single ONNX model) +Concurrency: Controlled via EMBED_MAX_CONCURRENT semaphore +""" +import asyncio +import logging +import os +import threading +from contextlib import asynccontextmanager +from typing import List, Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Config +MODEL_NAME = os.environ.get("EMBEDDING_MODEL", "BAAI/bge-base-en-v1.5") +MAX_CONCURRENT = int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2) +MAX_BATCH_SIZE = int(os.environ.get("EMBED_MAX_BATCH", "256") or 256) + +# Global model and semaphore +_model = None +_semaphore = threading.Semaphore(MAX_CONCURRENT) +_model_dim = None + + +def _load_model(): + """Load embedding model once at startup.""" + global _model, _model_dim + from fastembed import TextEmbedding + + logger.info(f"Loading model: {MODEL_NAME}") + _model = TextEmbedding(model_name=MODEL_NAME) + + # Warmup and get dimension + warmup = list(_model.embed(["warmup"])) + _model_dim = len(warmup[0]) + logger.info(f"Model loaded: dim={_model_dim}, max_concurrent={MAX_CONCURRENT}") + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Load model on startup.""" + _load_model() + yield + + +app = FastAPI(title="Embedding Service", lifespan=lifespan) + + +class EmbedRequest(BaseModel): + texts: List[str] + model: Optional[str] = None # Ignored for now, single model + + +class EmbedResponse(BaseModel): + vectors: List[List[float]] + dim: int + count: int + + +@app.post("/embed", response_model=EmbedResponse) +async def embed(request: EmbedRequest): + """Embed texts with concurrency control.""" + if not request.texts: + return EmbedResponse(vectors=[], dim=_model_dim or 768, count=0) + + if len(request.texts) > MAX_BATCH_SIZE: + raise HTTPException(400, f"Batch too large: {len(request.texts)} > {MAX_BATCH_SIZE}") + + # Run embedding in thread pool with semaphore + loop = asyncio.get_event_loop() + vectors = await loop.run_in_executor(None, _embed_sync, request.texts) + + return EmbedResponse(vectors=vectors, dim=_model_dim, count=len(vectors)) + + +def _embed_sync(texts: List[str]) -> List[List[float]]: + """Synchronous embedding with semaphore.""" + with _semaphore: + return [vec.tolist() for vec in _model.embed(texts)] + + +@app.get("/health") +async def health(): + """Health check for k8s probes.""" + return {"status": "ok", "model": MODEL_NAME, "dim": _model_dim} + + +if __name__ == "__main__": + import uvicorn + port = int(os.environ.get("EMBED_SERVICE_PORT", "8100")) + uvicorn.run(app, host="0.0.0.0", port=port) + diff --git a/embedding_service/requirements.txt b/embedding_service/requirements.txt new file mode 100644 index 00000000..efd79c41 --- /dev/null +++ b/embedding_service/requirements.txt @@ -0,0 +1,5 @@ +fastapi>=0.100.0 +uvicorn>=0.22.0 +fastembed>=0.2.0 +pydantic>=2.0.0 + diff --git a/scripts/ctx_cli/commands/reset.py b/scripts/ctx_cli/commands/reset.py index c09e3483..4737f6ea 100644 --- a/scripts/ctx_cli/commands/reset.py +++ b/scripts/ctx_cli/commands/reset.py @@ -202,20 +202,21 @@ def reset( # Determine which containers to build/start based on mode # Default is HTTP-only; SSE only starts when explicitly requested with --sse + # Embedding service is always included (shared ONNX model for all indexers) if mode == "sse": # SSE MCPs only (legacy, must be explicitly requested) - build_containers = ["indexer", "mcp", "mcp_indexer", "watcher"] - start_containers = ["mcp", "mcp_indexer", "watcher"] + build_containers = ["embedding", "indexer", "mcp", "mcp_indexer", "watcher"] + start_containers = ["embedding", "mcp", "mcp_indexer", "watcher"] mode_desc = "SSE MCPs only (legacy)" elif mode == "dual": # Dual mode (both SSE and HTTP) - build_containers = ["indexer", "mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] - start_containers = ["mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + build_containers = ["embedding", "indexer", "mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + start_containers = ["embedding", "mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] mode_desc = "Dual mode (SSE + HTTP)" else: # HTTP MCPs only (default, Codex compatible) + upload_service for remote sync - build_containers = ["indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] - start_containers = ["mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + build_containers = ["embedding", "indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + start_containers = ["embedding", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] mode_desc = "HTTP MCPs (streamable)" # Add learning_worker if rerank learning is enabled @@ -259,7 +260,7 @@ def reset( step += 1 _print(f"\n[bold][{step}/{steps_total}] Stopping services...[/bold]") if db_reset: - # Full reset including database volumes (qdrant, redis, neo4j) + # Full reset including database volumes (qdrant, redis, neo4j, embedding cache) _run_cmd(compose_cmd + ["down", "-v", "--remove-orphans"], "Stopping all containers and removing volumes", check=False) _print("[green]✓[/green] Services stopped and database volumes removed") else: @@ -448,7 +449,7 @@ def register_command(subparsers): ctx reset # Full reset with HTTP MCPs (default) ctx reset --dual # Both SSE and HTTP MCPs ctx reset --sse # SSE MCPs only (legacy) - ctx reset --db-reset # Reset database volumes (Qdrant, Redis, Neo4j) + ctx reset --db-reset # Reset database volumes (Qdrant, Redis, Neo4j, Embedding cache) ctx reset --skip-model # Skip llama model download ctx reset --skip-build # Skip container rebuild (faster) """ @@ -476,7 +477,7 @@ def register_command(subparsers): parser.add_argument( "--db-reset", action="store_true", - help="Reset database volumes (Qdrant, Redis, Neo4j)" + help="Reset database volumes (Qdrant, Redis, Neo4j, Embedding cache)" ) # Skip options diff --git a/scripts/ingest/qdrant.py b/scripts/ingest/qdrant.py index 73daf2f2..6d74b0c5 100644 --- a/scripts/ingest/qdrant.py +++ b/scripts/ingest/qdrant.py @@ -9,6 +9,7 @@ import logging import os +import threading import time import xxhash @@ -17,6 +18,17 @@ logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# ONNX concurrency control - prevents memory explosion with parallel workers +# --------------------------------------------------------------------------- +_EMBED_MAX_CONCURRENT = int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2) +_EMBED_SEMAPHORE = threading.Semaphore(_EMBED_MAX_CONCURRENT) + +# Remote embedding service configuration +_EMBEDDING_PROVIDER = os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() +_EMBEDDING_SERVICE_URL = os.environ.get("EMBEDDING_SERVICE_URL", "http://embedding:8100") +_EMBEDDING_SERVICE_TIMEOUT = int(os.environ.get("EMBEDDING_SERVICE_TIMEOUT", "60") or 60) + from qdrant_client import QdrantClient, models from scripts.ingest.config import ( @@ -979,22 +991,54 @@ def hash_id(text: str, path: str, start: int, end: int) -> int: return int(h[:16], 16) -def embed_batch(model, texts: List[str]) -> List[List[float]]: - """Embed a batch of texts using the embedding model. - - When ASYMMETRIC_EMBEDDING=1, uses passage_embed for documents (if available). - This enables asymmetric retrieval with models like Jina v3 that have - separate query/passage adapters. - """ - # Check for asymmetric embedding mode +def _embed_local(model, texts: List[str]) -> List[List[float]]: + """Local ONNX embedding with concurrency control.""" asymmetric = ( str(os.environ.get("ASYMMETRIC_EMBEDDING", "0")).strip().lower() in {"1", "true", "yes", "on"} ) - if asymmetric and hasattr(model, "passage_embed"): - # Use passage_embed for documents (asymmetric models like Jina v3) - return [vec.tolist() for vec in model.passage_embed(texts)] + # Semaphore prevents ONNX memory explosion with parallel workers + with _EMBED_SEMAPHORE: + if asymmetric and hasattr(model, "passage_embed"): + return [vec.tolist() for vec in model.passage_embed(texts)] + else: + return [vec.tolist() for vec in model.embed(texts)] + + +def _embed_remote(texts: List[str], model_name: str = "default") -> List[List[float]]: + """Remote embedding via HTTP service.""" + import requests + + try: + resp = requests.post( + f"{_EMBEDDING_SERVICE_URL}/embed", + json={"texts": texts, "model": model_name}, + timeout=_EMBEDDING_SERVICE_TIMEOUT, + ) + resp.raise_for_status() + return resp.json()["vectors"] + except Exception as e: + logger.error(f"Remote embedding failed: {e}") + raise + + +def embed_batch(model, texts: List[str]) -> List[List[float]]: + """Embed a batch of texts using configured provider. + + Providers (set via EMBEDDING_PROVIDER env var): + - local: Use local ONNX model with concurrency control (default) + - remote: Use remote embedding service at EMBEDDING_SERVICE_URL + + When ASYMMETRIC_EMBEDDING=1, uses passage_embed for documents (if available). + """ + if not texts: + return [] + + if _EMBEDDING_PROVIDER == "remote": + # Get model name for remote service + model_name = getattr(model, "model_name", None) or os.environ.get("EMBEDDING_MODEL", "default") + return _embed_remote(texts, model_name) else: - # Standard symmetric embedding - return [vec.tolist() for vec in model.embed(texts)] + # Local with semaphore-controlled concurrency + return _embed_local(model, texts) diff --git a/services/embedding_service/__init__.py b/services/embedding_service/__init__.py new file mode 100644 index 00000000..d6a3e920 --- /dev/null +++ b/services/embedding_service/__init__.py @@ -0,0 +1,2 @@ +# Embedding service package + From c1da2848464d1174c92d3114f762221c986a3f22 Mon Sep 17 00:00:00 2001 From: john donalson Date: Mon, 26 Jan 2026 05:16:23 -0500 Subject: [PATCH 3/7] Enable remote embedding service with replicas and memory tuning Updated docker-compose.yml to support 2 replicas of the embedding service with increased memory limits and improved healthcheck. Added remote embedding provider detection and stub in embedder.py to avoid loading ONNX models when using a remote service, saving memory. Requirements updated to include requests. Concurrency environment variable parsing hardened in main.py and qdrant.py. --- docker-compose.yml | 13 +++++++------ embedding_service/main.py | 2 +- requirements.txt | 1 + scripts/embedder.py | 40 ++++++++++++++++++++++++++++++++++++--- scripts/ingest/qdrant.py | 2 +- 5 files changed, 47 insertions(+), 11 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c4886f61..f1945a98 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,16 +24,16 @@ services: networks: - dev-remote-network - # Shared Embedding Service - single ONNX model serves all indexers - # Memory: ~1-2 GB (vs 15 GB with per-indexer models) + # Shared Embedding Service - ONNX models serve all indexers + # Memory: 6GB × 2 replicas = 12GB total (load balanced) # Enable with EMBEDDING_PROVIDER=remote in .env embedding: build: context: ./embedding_service dockerfile: Dockerfile - container_name: embedding-service + # No container_name - allows replicas ports: - - "8100:8100" + - "8100-8101:8100" environment: - EMBEDDING_MODEL=${EMBEDDING_MODEL:-BAAI/bge-base-en-v1.5} - EMBED_MAX_CONCURRENT=${EMBED_MAX_CONCURRENT:-2} @@ -43,15 +43,16 @@ services: volumes: - embedding_cache:/tmp/huggingface healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8100/health"] + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8100/health')"] interval: 30s timeout: 10s retries: 3 start_period: 60s deploy: + replicas: 2 resources: limits: - memory: 4G + memory: 6G restart: unless-stopped networks: - dev-remote-network diff --git a/embedding_service/main.py b/embedding_service/main.py index 47da509f..5a1cd7b9 100644 --- a/embedding_service/main.py +++ b/embedding_service/main.py @@ -24,7 +24,7 @@ # Config MODEL_NAME = os.environ.get("EMBEDDING_MODEL", "BAAI/bge-base-en-v1.5") -MAX_CONCURRENT = int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2) +MAX_CONCURRENT = max(1, int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2)) MAX_BATCH_SIZE = int(os.environ.get("EMBED_MAX_BATCH", "256") or 256) # Global model and semaphore diff --git a/requirements.txt b/requirements.txt index fd28cbab..d603d3c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ orjson redis>=5.0.0 xxhash>=3.0.0 lz4>=4.0.0 +requests>=2.28.0 # Tree-sitter 0.25+ with individual language packages tree_sitter>=0.25.0 tree_sitter_python>=0.23.0 diff --git a/scripts/embedder.py b/scripts/embedder.py index 3ee631e8..4f139f02 100644 --- a/scripts/embedder.py +++ b/scripts/embedder.py @@ -76,6 +76,28 @@ _ARCTIC_V2_REGISTERED = False _ARCTIC_V2_REGISTER_LOCK = threading.Lock() +# Remote embedding provider detection +_EMBEDDING_PROVIDER = os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() + + +class RemoteEmbeddingStub: + """Lightweight stub for remote embedding mode - no ONNX loaded. + + When EMBEDDING_PROVIDER=remote, we don't need to load the actual ONNX model + since all embedding calls go to the remote service. This stub provides the + model_name attribute needed by embed_batch() for routing. + """ + + def __init__(self, model_name: str): + self.model_name = model_name + + def embed(self, texts: List[str]): + """Stub - should not be called in remote mode.""" + raise RuntimeError( + "RemoteEmbeddingStub.embed() called but EMBEDDING_PROVIDER=remote. " + "Use embed_batch() from scripts.ingest.qdrant which routes to remote service." + ) + def _register_qwen3_model() -> None: """Register Qwen3 ONNX model with FastEmbed (one-time, thread-safe).""" @@ -149,13 +171,25 @@ def get_embedding_model(model_name: Optional[str] = None) -> Any: model_name: Model name override. If None, uses EMBEDDING_MODEL env var. Returns: - TextEmbedding instance (cached per model name). + TextEmbedding instance (cached per model name), or RemoteEmbeddingStub + when EMBEDDING_PROVIDER=remote (no ONNX loaded, saves ~3-4 GB RAM). """ - from fastembed import TextEmbedding - if model_name is None: model_name = os.environ.get("EMBEDDING_MODEL", DEFAULT_MODEL) + # Remote mode: return lightweight stub (no ONNX loaded) + # This saves ~3-4 GB RAM per indexer since embed_batch() routes to remote service + if _EMBEDDING_PROVIDER == "remote": + cached = _EMBED_MODEL_CACHE.get(f"remote:{model_name}") + if cached is not None: + return cached + stub = RemoteEmbeddingStub(model_name) + _EMBED_MODEL_CACHE[f"remote:{model_name}"] = stub + logger.info(f"[embedder] Using remote embedding service for {model_name} (no local ONNX)") + return stub + + from fastembed import TextEmbedding + # Register Qwen3 if enabled and requested if QWEN3_ENABLED and "qwen3" in model_name.lower(): _register_qwen3_model() diff --git a/scripts/ingest/qdrant.py b/scripts/ingest/qdrant.py index 6d74b0c5..feb9950c 100644 --- a/scripts/ingest/qdrant.py +++ b/scripts/ingest/qdrant.py @@ -21,7 +21,7 @@ # --------------------------------------------------------------------------- # ONNX concurrency control - prevents memory explosion with parallel workers # --------------------------------------------------------------------------- -_EMBED_MAX_CONCURRENT = int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2) +_EMBED_MAX_CONCURRENT = max(1, int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2)) _EMBED_SEMAPHORE = threading.Semaphore(_EMBED_MAX_CONCURRENT) # Remote embedding service configuration From 9e4f5fccaaaf454672dc088d990f584408991dee Mon Sep 17 00:00:00 2001 From: john donalson Date: Mon, 26 Jan 2026 05:30:11 -0500 Subject: [PATCH 4/7] Enhance embedding service config and scaling Adds detailed ONNX and embedding model configuration options to .env.example and docker-compose.yml, including threading and batching controls. Embedding service now supports ONNX CPU optimizations and improved batching for efficiency. The reset CLI command starts multiple embedding service replicas and waits for readiness, improving reliability for indexing workflows. --- .env.example | 43 +++++++++++++++- docker-compose.yml | 12 ++++- embedding_service/main.py | 82 ++++++++++++++++++++++++++----- scripts/ctx_cli/commands/reset.py | 16 ++++-- 4 files changed, 134 insertions(+), 19 deletions(-) diff --git a/.env.example b/.env.example index 30ebc3eb..dbe27f3c 100644 --- a/.env.example +++ b/.env.example @@ -20,10 +20,51 @@ COLLECTION_NAME=codebase # Embeddings EMBEDDING_MODEL=BAAI/bge-base-en-v1.5 -EMBEDDING_PROVIDER=fastembed # Optional repo tag attached to each payload REPO_NAME=workspace +# --------------------------------------------------------------------------- +# Embedding Service Configuration (Shared ONNX for scale) +# --------------------------------------------------------------------------- +# EMBEDDING_PROVIDER: local | remote +# local = Use in-process ONNX (default, high memory per worker) +# remote = Use shared embedding service (recommended for scale) +EMBEDDING_PROVIDER=local + +# When EMBEDDING_PROVIDER=remote, calls this service +EMBEDDING_SERVICE_URL=http://embedding:8100 +EMBEDDING_SERVICE_TIMEOUT=60 + +# Max concurrent ONNX inferences (local mode or in embedding service) +# Prevents memory explosion with parallel workers +EMBED_MAX_CONCURRENT=2 + +# Max batch size per embed request +EMBED_MAX_BATCH=256 + +# --------------------------------------------------------------------------- +# ONNX CPU Optimizations (for embedding service) +# --------------------------------------------------------------------------- +# ONNX_THREADS: Number of threads for intra-op parallelism +# 0 = auto (1 per physical core), or set explicit count (e.g., 4-6) +ONNX_THREADS=0 + +# ONNX_DISABLE_SPINNING: Disable thread spin-wait (saves CPU cycles) +# 0 = spinning enabled (faster, burns CPU), 1 = disabled (power efficient) +ONNX_DISABLE_SPINNING=0 + +# EMBED_OPTIMAL_BATCH: Internal batch size for chunking large requests +# Sweet spot for CPU is 32-64. Too small = overhead, too large = memory pressure +EMBED_OPTIMAL_BATCH=32 + +# --------------------------------------------------------------------------- +# Embedding Model Options +# --------------------------------------------------------------------------- +# Model options (changing model requires re-indexing!): +# BAAI/bge-base-en-v1.5 - Default, solid quality (768 dim, 0.21 GB) +# nomic-ai/nomic-embed-text-v1.5 - Faster, outperforms BGE on MTEB (768 dim, 0.13 GB) +# BAAI/bge-large-en-v1.5 - Higher quality, slower (1024 dim, 0.67 GB) +# # Qwen3-Embedding Feature Flag (optional, experimental) # Enable to use Qwen3-Embedding-0.6B instead of BGE-base (requires reindex) # QWEN3_EMBEDDING_ENABLED=0 diff --git a/docker-compose.yml b/docker-compose.yml index f1945a98..5c5ab27e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,9 +35,15 @@ services: ports: - "8100-8101:8100" environment: - - EMBEDDING_MODEL=${EMBEDDING_MODEL:-BAAI/bge-base-en-v1.5} + # Model: use nomic-Q for faster + better quality, or override via EMBEDDING_MODEL + - EMBEDDING_MODEL=${EMBEDDING_MODEL:-nomic-ai/nomic-embed-text-v1.5} - EMBED_MAX_CONCURRENT=${EMBED_MAX_CONCURRENT:-2} - EMBED_MAX_BATCH=${EMBED_MAX_BATCH:-256} + # ONNX CPU optimizations + - ONNX_THREADS=${ONNX_THREADS:-0} + - ONNX_DISABLE_SPINNING=${ONNX_DISABLE_SPINNING:-0} + - EMBED_OPTIMAL_BATCH=${EMBED_OPTIMAL_BATCH:-32} + # Cache paths - HF_HOME=/tmp/huggingface - FASTEMBED_CACHE_PATH=/tmp/fastembed volumes: @@ -260,7 +266,9 @@ services: - TRANSFORMERS_CACHE=/tmp/huggingface/transformers - FASTEMBED_CACHE_PATH=/tmp/huggingface/fastembed - EMBEDDING_MODEL=${EMBEDDING_MODEL} - - EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER} + # Override: learning worker calls embedder.embed() directly via RecursiveReranker._encode_raw() + # which doesn't route through embed_batch(). Needs local ONNX, not RemoteEmbeddingStub. + - EMBEDDING_PROVIDER=local # Cross-encoder reranker (used as teacher for learning) - RERANKER_MODEL=${RERANKER_MODEL:-} - RERANKER_ONNX_PATH=${RERANKER_ONNX_PATH:-} diff --git a/embedding_service/main.py b/embedding_service/main.py index 5a1cd7b9..fdce6ad0 100644 --- a/embedding_service/main.py +++ b/embedding_service/main.py @@ -8,6 +8,16 @@ Memory: ~1-2 GB (single ONNX model) Concurrency: Controlled via EMBED_MAX_CONCURRENT semaphore + +ONNX CPU Optimizations: + ONNX_THREADS: Number of threads for intra-op parallelism (default: 0 = auto) + ONNX_DISABLE_SPINNING: Set to 1 to disable thread spinning (saves CPU cycles) + EMBED_OPTIMAL_BATCH: Internal batch size for chunking large requests (default: 32) + +Model Options (via EMBEDDING_MODEL env var): + BAAI/bge-base-en-v1.5 - Default, solid quality (768 dim, 0.21 GB) + nomic-ai/nomic-embed-text-v1.5-Q - Quantized, faster, outperforms BGE on MTEB (768 dim, 0.13 GB) + BAAI/bge-large-en-v1.5 - Higher quality, slower (1024 dim, 0.67 GB) """ import asyncio import logging @@ -22,28 +32,59 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# Config +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- MODEL_NAME = os.environ.get("EMBEDDING_MODEL", "BAAI/bge-base-en-v1.5") MAX_CONCURRENT = max(1, int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2)) MAX_BATCH_SIZE = int(os.environ.get("EMBED_MAX_BATCH", "256") or 256) +# ONNX runtime optimizations +ONNX_THREADS = int(os.environ.get("ONNX_THREADS", "0") or 0) # 0 = auto (1 per physical core) +ONNX_DISABLE_SPINNING = os.environ.get("ONNX_DISABLE_SPINNING", "0").strip().lower() in {"1", "true", "yes"} +EMBED_OPTIMAL_BATCH = int(os.environ.get("EMBED_OPTIMAL_BATCH", "32") or 32) # Sweet spot for CPU + # Global model and semaphore _model = None _semaphore = threading.Semaphore(MAX_CONCURRENT) _model_dim = None +_model_info = {} def _load_model(): - """Load embedding model once at startup.""" - global _model, _model_dim + """Load embedding model with ONNX optimizations.""" + global _model, _model_dim, _model_info from fastembed import TextEmbedding - + logger.info(f"Loading model: {MODEL_NAME}") - _model = TextEmbedding(model_name=MODEL_NAME) - - # Warmup and get dimension - warmup = list(_model.embed(["warmup"])) + logger.info(f"ONNX config: threads={ONNX_THREADS or 'auto'}, disable_spinning={ONNX_DISABLE_SPINNING}, optimal_batch={EMBED_OPTIMAL_BATCH}") + + # Build kwargs for TextEmbedding + # FastEmbed accepts 'threads' parameter for ONNX session + model_kwargs = {} + if ONNX_THREADS > 0: + model_kwargs["threads"] = ONNX_THREADS + + _model = TextEmbedding(model_name=MODEL_NAME, **model_kwargs) + + # Apply additional ONNX session options if we can access the session + # Note: FastEmbed may not expose all session options, but threads is the main one + + # Warmup and get dimension (also warms up ONNX runtime) + logger.info("Warming up model...") + warmup = list(_model.embed(["warmup query for initialization"])) _model_dim = len(warmup[0]) + + # Store model info for health endpoint + _model_info = { + "model": MODEL_NAME, + "dim": _model_dim, + "threads": ONNX_THREADS or "auto", + "disable_spinning": ONNX_DISABLE_SPINNING, + "optimal_batch": EMBED_OPTIMAL_BATCH, + "max_concurrent": MAX_CONCURRENT, + } + logger.info(f"Model loaded: dim={_model_dim}, max_concurrent={MAX_CONCURRENT}") @@ -85,15 +126,32 @@ async def embed(request: EmbedRequest): def _embed_sync(texts: List[str]) -> List[List[float]]: - """Synchronous embedding with semaphore.""" + """Synchronous embedding with semaphore and optimal batching. + + Chunks large requests into EMBED_OPTIMAL_BATCH sized pieces for better + CPU cache utilization and memory efficiency. + """ with _semaphore: - return [vec.tolist() for vec in _model.embed(texts)] + # Chunk into optimal batch sizes for CPU efficiency + if len(texts) <= EMBED_OPTIMAL_BATCH: + return [vec.tolist() for vec in _model.embed(texts)] + + # Process in chunks + all_vectors = [] + for i in range(0, len(texts), EMBED_OPTIMAL_BATCH): + chunk = texts[i:i + EMBED_OPTIMAL_BATCH] + chunk_vectors = [vec.tolist() for vec in _model.embed(chunk)] + all_vectors.extend(chunk_vectors) + return all_vectors @app.get("/health") async def health(): - """Health check for k8s probes.""" - return {"status": "ok", "model": MODEL_NAME, "dim": _model_dim} + """Health check for k8s probes. Returns full model config.""" + return { + "status": "ok", + **_model_info, + } if __name__ == "__main__": diff --git a/scripts/ctx_cli/commands/reset.py b/scripts/ctx_cli/commands/reset.py index 4737f6ea..393eb797 100644 --- a/scripts/ctx_cli/commands/reset.py +++ b/scripts/ctx_cli/commands/reset.py @@ -278,21 +278,28 @@ def reset( else: _print(f"\n[bold][{step}/{steps_total}] Skipping container build[/bold]") - # Step 3: Start Qdrant, Redis (if enabled), and Neo4j (if enabled) and wait + # Step 3: Start Qdrant, Redis (if enabled), Neo4j (if enabled), and Embedding service step += 1 db_services = ["qdrant"] if redis_enabled: db_services.append("redis") if neo4j_enabled: db_services.append("neo4j") + # Start embedding service early (indexer needs it) + db_services.append("embedding") _print(f"\n[bold][{step}/{steps_total}] Starting {', '.join(db_services)}...[/bold]") - _run_cmd(compose_cmd + ["up", "-d"] + db_services, f"Starting {', '.join(db_services)}") + # Use --scale for embedding to get 2 replicas (deploy.replicas is Swarm-only) + _run_cmd(compose_cmd + ["up", "-d", "--scale", "embedding=2"] + db_services, f"Starting {', '.join(db_services)} (embedding×2)") # Use helper that normalizes Docker hostname to localhost for host CLI qdrant_url = get_qdrant_url_for_host() if not _wait_for_qdrant(qdrant_url): return 1 + # Wait for embedding service to be ready (indexer needs it) + if not _wait_for_embedding("http://localhost:8100"): + _print("[yellow]Warning:[/yellow] Embedding service not ready, indexer may have errors") + # Step 4: Initialize payload indexes step += 1 _print(f"\n[bold][{step}/{steps_total}] Initializing payload indexes...[/bold]") @@ -407,8 +414,9 @@ def reset( _print(f"\n[bold][{step}/{steps_total}] Starting services...[/bold]") # Start services - cmd = compose_cmd + ["up", "-d"] + start_containers - _run_cmd(cmd, f"Starting: {', '.join(start_containers)}") + # Use --scale for embedding service to get multiple replicas (deploy.replicas is Swarm-only) + cmd = compose_cmd + ["up", "-d", "--scale", "embedding=2"] + start_containers + _run_cmd(cmd, f"Starting: {', '.join(start_containers)} (embedding×2)") _print("[green]✓[/green] Services started") _print_panel( From abb3cd46b502aa5bc25005e9a5ac59aa84a8e122 Mon Sep 17 00:00:00 2001 From: john donalson Date: Mon, 26 Jan 2026 05:32:15 -0500 Subject: [PATCH 5/7] Add wait function for embedding service readiness Introduces the _wait_for_embedding function to check if the embedding service is available before proceeding. This improves reliability by ensuring dependent services are ready before use. --- scripts/ctx_cli/commands/reset.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/scripts/ctx_cli/commands/reset.py b/scripts/ctx_cli/commands/reset.py index 393eb797..ab3b34df 100644 --- a/scripts/ctx_cli/commands/reset.py +++ b/scripts/ctx_cli/commands/reset.py @@ -116,6 +116,26 @@ def _wait_for_qdrant(url: str = "http://localhost:6333", timeout: int = 60) -> b return False +def _wait_for_embedding(url: str = "http://localhost:8100", timeout: int = 90) -> bool: + """Wait for embedding service to be ready.""" + _print(f"[dim]Waiting for embedding service at {url}...[/dim]") + start = time.time() + + while time.time() - start < timeout: + try: + health_url = f"{url.rstrip('/')}/health" + with urllib.request.urlopen(health_url, timeout=5) as r: + if getattr(r, "status", 200) < 500: + _print("[green]✓[/green] Embedding service is ready") + return True + except Exception as e: + logger.debug(f"Suppressed exception: {e}") + time.sleep(2) + + _print(f"[red]Error:[/red] Embedding service not ready after {timeout}s", error=True) + return False + + def _download_file(url: str, dest: Path, description: str) -> bool: """Download a file with progress display.""" _print(f"[dim]Downloading {description}...[/dim]") From 8d1fa15be97059c22a27854e406aaa9dc1fbb91c Mon Sep 17 00:00:00 2001 From: john donalson Date: Mon, 26 Jan 2026 06:27:05 -0500 Subject: [PATCH 6/7] Add model selection and tokenizer override support Updated Dockerfile to allow embedding model selection via build argument, defaulting to nomic-ai/nomic-embed-text-v1.5 for improved performance. Modified reset.py to support overriding the default tokenizer URL using the TOKENIZER_URL environment variable. --- .../context-engine/templates/configmap.yaml | 2 + .../templates/embedding-service.yaml | 91 +++++++++++++++ deploy/helm/context-engine/values.yaml | 43 ++++++- deploy/kubernetes/configmap.yaml | 6 +- deploy/kubernetes/embedding-service.yaml | 106 ++++++++++++++++++ deploy/kubernetes/indexer-services.yaml | 30 +++++ deploy/kubernetes/kustomization.yaml | 6 + .../kubernetes/learning-reranker-worker.yaml | 4 + deploy/kubernetes/mcp-http.yaml | 25 +++++ deploy/kubernetes/mcp-indexer.yaml | 15 +++ docker-compose.yml | 12 +- embedding_service/Dockerfile | 7 +- scripts/ctx_cli/commands/reset.py | 7 +- scripts/ingest/qdrant.py | 26 ++++- scripts/warm_start.py | 55 +++++++++ 15 files changed, 420 insertions(+), 15 deletions(-) create mode 100644 deploy/helm/context-engine/templates/embedding-service.yaml create mode 100644 deploy/kubernetes/embedding-service.yaml diff --git a/deploy/helm/context-engine/templates/configmap.yaml b/deploy/helm/context-engine/templates/configmap.yaml index 2a0f1f85..3e472967 100644 --- a/deploy/helm/context-engine/templates/configmap.yaml +++ b/deploy/helm/context-engine/templates/configmap.yaml @@ -11,7 +11,9 @@ data: QDRANT_URL: {{ include "context-engine.qdrantUrl" . | quote }} EMBEDDING_MODEL: {{ .Values.config.embeddingModel | quote }} EMBEDDING_PROVIDER: {{ .Values.config.embeddingProvider | quote }} + EMBEDDING_SERVICE_URL: {{ .Values.config.embeddingServiceUrl | quote }} EMBEDDING_WARMUP: "0" + INDEX_WORKERS: {{ .Values.config.indexWorkers | default "4" | quote }} FASTMCP_HOST: {{ .Values.config.fastmcp.host | quote }} FASTMCP_PORT: {{ .Values.config.fastmcp.port | quote }} diff --git a/deploy/helm/context-engine/templates/embedding-service.yaml b/deploy/helm/context-engine/templates/embedding-service.yaml new file mode 100644 index 00000000..7a7dee39 --- /dev/null +++ b/deploy/helm/context-engine/templates/embedding-service.yaml @@ -0,0 +1,91 @@ +{{- if eq .Values.config.embeddingProvider "remote" }} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "context-engine.fullname" . }}-embedding + labels: + {{- include "context-engine.labels" . | nindent 4 }} + app.kubernetes.io/component: embedding +spec: + replicas: {{ .Values.embedding.replicas | default 2 }} + selector: + matchLabels: + {{- include "context-engine.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: embedding + template: + metadata: + labels: + {{- include "context-engine.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: embedding + spec: + containers: + - name: embedding + image: "{{ .Values.embedding.image.repository | default "context-engine-embedding" }}:{{ .Values.embedding.image.tag | default "latest" }}" + ports: + - containerPort: 8100 + env: + - name: EMBEDDING_MODEL + value: {{ .Values.config.embeddingModel | quote }} + - name: EMBED_MAX_CONCURRENT + value: {{ .Values.embedding.maxConcurrent | default "2" | quote }} + - name: EMBED_OPTIMAL_BATCH + value: {{ .Values.embedding.optimalBatch | default "32" | quote }} + - name: ONNX_THREADS + value: {{ .Values.embedding.onnxThreads | default "4" | quote }} + - name: ONNX_DISABLE_SPINNING + value: "1" + - name: OMP_NUM_THREADS + value: {{ .Values.embedding.onnxThreads | default "4" | quote }} + - name: MKL_NUM_THREADS + value: {{ .Values.embedding.onnxThreads | default "4" | quote }} + resources: + {{- toYaml .Values.embedding.resources | nindent 12 }} + readinessProbe: + httpGet: + path: /health + port: 8100 + initialDelaySeconds: 30 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /health + port: 8100 + initialDelaySeconds: 60 + periodSeconds: 30 +--- +apiVersion: v1 +kind: Service +metadata: + name: embedding + labels: + {{- include "context-engine.labels" . | nindent 4 }} +spec: + selector: + {{- include "context-engine.selectorLabels" . | nindent 4 }} + app.kubernetes.io/component: embedding + ports: + - port: 8100 + targetPort: 8100 +--- +{{- if .Values.embedding.autoscaling.enabled }} +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: {{ include "context-engine.fullname" . }}-embedding +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: {{ include "context-engine.fullname" . }}-embedding + minReplicas: {{ .Values.embedding.autoscaling.minReplicas | default 2 }} + maxReplicas: {{ .Values.embedding.autoscaling.maxReplicas | default 10 }} + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: {{ .Values.embedding.autoscaling.targetCPU | default 70 }} +{{- end }} +{{- end }} + diff --git a/deploy/helm/context-engine/values.yaml b/deploy/helm/context-engine/values.yaml index bdfc11be..7010c812 100644 --- a/deploy/helm/context-engine/values.yaml +++ b/deploy/helm/context-engine/values.yaml @@ -110,6 +110,37 @@ qdrant: initialDelaySeconds: 5 periodSeconds: 5 +# ----------------------------------------------------------------------------- +# Embedding Service Configuration (shared ONNX model) +# ----------------------------------------------------------------------------- +embedding: + # -- Number of replicas + replicas: 2 + # -- Image configuration + image: + repository: context-engine-embedding + tag: latest + # -- Max concurrent embeddings per replica + maxConcurrent: 2 + # -- Optimal batch size for CPU cache + optimalBatch: 32 + # -- ONNX threads per replica + onnxThreads: 4 + # -- Resource requests and limits + resources: + requests: + cpu: "2" + memory: 4Gi + limits: + cpu: "4" + memory: 6Gi + # -- Autoscaling configuration + autoscaling: + enabled: true + minReplicas: 2 + maxReplicas: 10 + targetCPU: 70 + # ----------------------------------------------------------------------------- # MCP Indexer HTTP Configuration # ----------------------------------------------------------------------------- @@ -463,10 +494,14 @@ config: # -- Qdrant URL (auto-generated if not set) qdrantUrl: "" # -- Embedding model - embeddingModel: BAAI/bge-base-en-v1.5 - # -- Embedding provider - embeddingProvider: fastembed - + embeddingModel: nomic-ai/nomic-embed-text-v1.5 + # -- Embedding provider (remote = shared service, fastembed = local) + embeddingProvider: remote + # -- Embedding service URL (when provider=remote) + embeddingServiceUrl: http://embedding:8100 + # -- Index workers (parallel file processing) + indexWorkers: 4 + # -- FastMCP settings fastmcp: host: "0.0.0.0" diff --git a/deploy/kubernetes/configmap.yaml b/deploy/kubernetes/configmap.yaml index caf3e4c3..9d55fc2c 100644 --- a/deploy/kubernetes/configmap.yaml +++ b/deploy/kubernetes/configmap.yaml @@ -15,8 +15,10 @@ data: CTX_SUMMARY_CHARS: '0' CURRENT_REPO: '' DECODER_MAX_TOKENS: '4000' - EMBEDDING_MODEL: BAAI/bge-base-en-v1.5 - EMBEDDING_PROVIDER: fastembed + EMBEDDING_MODEL: nomic-ai/nomic-embed-text-v1.5 + EMBEDDING_PROVIDER: remote + EMBEDDING_SERVICE_URL: http://embedding:8100 + INDEX_WORKERS: "4" EMBEDDING_WARMUP: '0' FASTMCP_HOST: 0.0.0.0 FASTMCP_HTTP_HEALTH_PORT: '18002' diff --git a/deploy/kubernetes/embedding-service.yaml b/deploy/kubernetes/embedding-service.yaml new file mode 100644 index 00000000..e82439e3 --- /dev/null +++ b/deploy/kubernetes/embedding-service.yaml @@ -0,0 +1,106 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: embedding-service + namespace: context-engine + labels: + app: embedding-service +spec: + replicas: 2 + selector: + matchLabels: + app: embedding-service + template: + metadata: + labels: + app: embedding-service + spec: + containers: + - name: embedding + image: context-engine-embedding:latest + ports: + - containerPort: 8100 + env: + - name: EMBEDDING_MODEL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_MODEL + - name: EMBED_MAX_CONCURRENT + value: "2" + - name: EMBED_MAX_BATCH + value: "256" + - name: EMBED_OPTIMAL_BATCH + value: "32" + - name: ONNX_THREADS + value: "4" + - name: ONNX_DISABLE_SPINNING + value: "1" + - name: OMP_NUM_THREADS + value: "4" + - name: MKL_NUM_THREADS + value: "4" + - name: HF_HOME + value: /tmp/huggingface + - name: FASTEMBED_CACHE_PATH + value: /tmp/fastembed + resources: + requests: + memory: "4Gi" + cpu: "2" + limits: + memory: "6Gi" + cpu: "4" + readinessProbe: + httpGet: + path: /health + port: 8100 + initialDelaySeconds: 30 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /health + port: 8100 + initialDelaySeconds: 60 + periodSeconds: 30 + volumeMounts: + - name: embedding-cache + mountPath: /tmp/huggingface + volumes: + - name: embedding-cache + emptyDir: + sizeLimit: 2Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: embedding + namespace: context-engine +spec: + selector: + app: embedding-service + ports: + - port: 8100 + targetPort: 8100 + type: ClusterIP +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: embedding-service-hpa + namespace: context-engine +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: embedding-service + minReplicas: 2 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 + diff --git a/deploy/kubernetes/indexer-services.yaml b/deploy/kubernetes/indexer-services.yaml index 123582c6..26eeed17 100644 --- a/deploy/kubernetes/indexer-services.yaml +++ b/deploy/kubernetes/indexer-services.yaml @@ -71,6 +71,21 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_MODEL + - name: EMBEDDING_PROVIDER + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: HF_HOME value: /work/models/hf-cache - name: XDG_CACHE_HOME @@ -209,6 +224,21 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_MODEL + - name: EMBEDDING_PROVIDER + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: HF_HOME value: /work/models/hf-cache - name: XDG_CACHE_HOME diff --git a/deploy/kubernetes/kustomization.yaml b/deploy/kubernetes/kustomization.yaml index 233ff860..b10fd98b 100644 --- a/deploy/kubernetes/kustomization.yaml +++ b/deploy/kubernetes/kustomization.yaml @@ -13,6 +13,7 @@ resources: # Core services - qdrant.yaml - redis.yaml + - embedding-service.yaml - code-models-pvc.yaml - upload-pvc.yaml @@ -57,6 +58,9 @@ images: newTag: latest # newTag: v1.0.0 # newName: your-registry/context-engine + - name: context-engine-embedding + newTag: latest + # newName: your-registry/context-engine-embedding # Namespace override namespace: context-engine @@ -68,6 +72,8 @@ replicas: count: 1 # Set to 2+ for production - name: mcp-indexer count: 1 # Set to 2+ for production + - name: embedding-service + count: 2 # Shared embedding service replicas # Resource patches patches: diff --git a/deploy/kubernetes/learning-reranker-worker.yaml b/deploy/kubernetes/learning-reranker-worker.yaml index c1431c22..2148a783 100644 --- a/deploy/kubernetes/learning-reranker-worker.yaml +++ b/deploy/kubernetes/learning-reranker-worker.yaml @@ -56,6 +56,10 @@ spec: - name: metadata-volume mountPath: /tmp/rerank_events subPath: rerank_events + env: + # Learning worker needs local embeddings for reranker training + - name: EMBEDDING_PROVIDER + value: "local" envFrom: - configMapRef: name: context-engine-config diff --git a/deploy/kubernetes/mcp-http.yaml b/deploy/kubernetes/mcp-http.yaml index c8ace6ed..b6c2f5fb 100644 --- a/deploy/kubernetes/mcp-http.yaml +++ b/deploy/kubernetes/mcp-http.yaml @@ -73,6 +73,16 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: TOOL_STORE_DESCRIPTION valueFrom: configMapKeyRef: @@ -253,6 +263,21 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_MODEL + - name: EMBEDDING_PROVIDER + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: HF_HOME value: /work/models/hf-cache - name: XDG_CACHE_HOME diff --git a/deploy/kubernetes/mcp-indexer.yaml b/deploy/kubernetes/mcp-indexer.yaml index 2fbec1a1..26b51a2d 100644 --- a/deploy/kubernetes/mcp-indexer.yaml +++ b/deploy/kubernetes/mcp-indexer.yaml @@ -78,6 +78,21 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_MODEL + - name: EMBEDDING_PROVIDER + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: HF_HOME value: /work/models/hf-cache - name: XDG_CACHE_HOME diff --git a/docker-compose.yml b/docker-compose.yml index 5c5ab27e..9b983a63 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,14 +35,19 @@ services: ports: - "8100-8101:8100" environment: - # Model: use nomic-Q for faster + better quality, or override via EMBEDDING_MODEL + # Model: use nomic for faster + better quality, or override via EMBEDDING_MODEL - EMBEDDING_MODEL=${EMBEDDING_MODEL:-nomic-ai/nomic-embed-text-v1.5} - EMBED_MAX_CONCURRENT=${EMBED_MAX_CONCURRENT:-2} - EMBED_MAX_BATCH=${EMBED_MAX_BATCH:-256} # ONNX CPU optimizations - - ONNX_THREADS=${ONNX_THREADS:-0} - - ONNX_DISABLE_SPINNING=${ONNX_DISABLE_SPINNING:-0} + # ONNX_THREADS: limit threads per replica (0=all cores, 4-6 recommended for multi-replica) + - ONNX_THREADS=${ONNX_THREADS:-4} + # Disable thread spinning to reduce CPU burn when idle + - ONNX_DISABLE_SPINNING=${ONNX_DISABLE_SPINNING:-1} - EMBED_OPTIMAL_BATCH=${EMBED_OPTIMAL_BATCH:-32} + # Limit OMP/MKL threads to match ONNX_THREADS + - OMP_NUM_THREADS=${ONNX_THREADS:-4} + - MKL_NUM_THREADS=${ONNX_THREADS:-4} # Cache paths - HF_HOME=/tmp/huggingface - FASTEMBED_CACHE_PATH=/tmp/fastembed @@ -59,6 +64,7 @@ services: resources: limits: memory: 6G + cpus: "4" restart: unless-stopped networks: - dev-remote-network diff --git a/embedding_service/Dockerfile b/embedding_service/Dockerfile index 97c70b43..44ce064c 100644 --- a/embedding_service/Dockerfile +++ b/embedding_service/Dockerfile @@ -9,8 +9,11 @@ RUN pip install --no-cache-dir -r requirements.txt # Copy service COPY main.py . -# Pre-download default model during build (optional, speeds up startup) -# RUN python -c "from fastembed import TextEmbedding; TextEmbedding('BAAI/bge-base-en-v1.5')" +# Pre-download embedding model during build for faster startup +# Default: nomic (faster + better quality than BGE) +# Override at build time: --build-arg EMBEDDING_MODEL=BAAI/bge-base-en-v1.5 +ARG EMBEDDING_MODEL=nomic-ai/nomic-embed-text-v1.5 +RUN python -c "from fastembed import TextEmbedding; TextEmbedding('${EMBEDDING_MODEL}')" ENV EMBED_SERVICE_PORT=8100 EXPOSE 8100 diff --git a/scripts/ctx_cli/commands/reset.py b/scripts/ctx_cli/commands/reset.py index ab3b34df..f6f5b95f 100644 --- a/scripts/ctx_cli/commands/reset.py +++ b/scripts/ctx_cli/commands/reset.py @@ -53,7 +53,12 @@ def _load_env_to_environ(): # Default model/tokenizer URLs (same as Makefile) DEFAULT_MODEL_URL = "https://huggingface.co/ibm-granite/granite-4.0-micro-GGUF/resolve/main/granite-4.0-micro-Q4_K_M.gguf" DEFAULT_MODEL_PATH = "models/model.gguf" -DEFAULT_TOKENIZER_URL = "https://huggingface.co/BAAI/bge-base-en-v1.5/resolve/main/tokenizer.json" +# Tokenizer for micro-chunking (token counting). BGE tokenizer works for any model. +# Override via TOKENIZER_URL env var if needed. +DEFAULT_TOKENIZER_URL = os.environ.get( + "TOKENIZER_URL", + "https://huggingface.co/BAAI/bge-base-en-v1.5/resolve/main/tokenizer.json" +) DEFAULT_TOKENIZER_PATH = "models/tokenizer.json" diff --git a/scripts/ingest/qdrant.py b/scripts/ingest/qdrant.py index feb9950c..79fbac6d 100644 --- a/scripts/ingest/qdrant.py +++ b/scripts/ingest/qdrant.py @@ -29,6 +29,13 @@ _EMBEDDING_SERVICE_URL = os.environ.get("EMBEDDING_SERVICE_URL", "http://embedding:8100") _EMBEDDING_SERVICE_TIMEOUT = int(os.environ.get("EMBEDDING_SERVICE_TIMEOUT", "60") or 60) +# Client-side load balancing for multiple replicas (Compose mode) +# Set EMBEDDING_SERVICE_URLS=http://host1:8100,http://host2:8100 for parallel processing +_EMBEDDING_SERVICE_URLS_RAW = os.environ.get("EMBEDDING_SERVICE_URLS", "").strip() +_EMBEDDING_SERVICE_URLS = [u.strip() for u in _EMBEDDING_SERVICE_URLS_RAW.split(",") if u.strip()] if _EMBEDDING_SERVICE_URLS_RAW else [] +_EMBED_REPLICA_INDEX = 0 +_EMBED_REPLICA_LOCK = threading.Lock() + from qdrant_client import QdrantClient, models from scripts.ingest.config import ( @@ -1007,19 +1014,32 @@ def _embed_local(model, texts: List[str]) -> List[List[float]]: def _embed_remote(texts: List[str], model_name: str = "default") -> List[List[float]]: - """Remote embedding via HTTP service.""" + """Remote embedding via HTTP service with client-side load balancing. + + If EMBEDDING_SERVICE_URLS is set (comma-separated), round-robins across replicas + for true parallel processing. Otherwise uses single EMBEDDING_SERVICE_URL. + """ + global _EMBED_REPLICA_INDEX import requests + # Client-side load balancing across replicas + if _EMBEDDING_SERVICE_URLS: + with _EMBED_REPLICA_LOCK: + url = _EMBEDDING_SERVICE_URLS[_EMBED_REPLICA_INDEX % len(_EMBEDDING_SERVICE_URLS)] + _EMBED_REPLICA_INDEX += 1 + else: + url = _EMBEDDING_SERVICE_URL + try: resp = requests.post( - f"{_EMBEDDING_SERVICE_URL}/embed", + f"{url}/embed", json={"texts": texts, "model": model_name}, timeout=_EMBEDDING_SERVICE_TIMEOUT, ) resp.raise_for_status() return resp.json()["vectors"] except Exception as e: - logger.error(f"Remote embedding failed: {e}") + logger.error(f"Remote embedding failed ({url}): {e}") raise diff --git a/scripts/warm_start.py b/scripts/warm_start.py index c766ff46..9027d36b 100644 --- a/scripts/warm_start.py +++ b/scripts/warm_start.py @@ -101,9 +101,19 @@ async def warmup_reranker() -> float: async def warmup_embedding_model(model_name: str) -> float: """Warm up embedding model by loading and running dummy inference. + When EMBEDDING_PROVIDER=remote, checks the embedding service health instead + of loading a local ONNX model. + Returns: Latency in milliseconds """ + embedding_provider = os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() + + # Remote mode: check embedding service health instead of local model + if embedding_provider == "remote": + return await _warmup_remote_embedding() + + # Local mode: load and warm up local ONNX model try: start = time.perf_counter() model = await asyncio.to_thread(get_embedding_model, model_name) @@ -121,6 +131,51 @@ async def warmup_embedding_model(model_name: str) -> float: raise +async def _warmup_remote_embedding() -> float: + """Check remote embedding service health and warm it up with a test request. + + Returns: + Latency in milliseconds + """ + import urllib.request + import json + + service_url = os.environ.get("EMBEDDING_SERVICE_URL", "http://embedding:8100") + timeout = int(os.environ.get("EMBEDDING_SERVICE_TIMEOUT", "30")) + + try: + start = time.perf_counter() + + # First check health + health_url = f"{service_url.rstrip('/')}/health" + with urllib.request.urlopen(health_url, timeout=timeout) as resp: + if resp.status != 200: + raise RuntimeError(f"Embedding service unhealthy: {resp.status}") + health_data = json.loads(resp.read().decode()) + logger.info(f"Remote embedding service healthy: {health_data.get('model', 'unknown')}") + + # Warm up with a test embedding request + embed_url = f"{service_url.rstrip('/')}/embed" + test_payload = json.dumps({"texts": ["warmup probe text"]}).encode() + req = urllib.request.Request( + embed_url, + data=test_payload, + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + if resp.status != 200: + raise RuntimeError(f"Embedding service embed failed: {resp.status}") + + elapsed = (time.perf_counter() - start) * 1000 + logger.info(f"Remote embedding warmup: {elapsed:.1f}ms") + return elapsed + + except Exception as e: + logger.warning(f"Remote embedding warmup failed: {e}") + raise + + async def warmup_all_models() -> dict: """Orchestrate parallel warmup of embedding + reranker models. From 4379f3b06fd93e0dfb7241630bbe5fe258fd35ed Mon Sep 17 00:00:00 2001 From: john donalson Date: Mon, 26 Jan 2026 08:02:27 -0500 Subject: [PATCH 7/7] Optimize embedding service memory and remote support Align embedding cache paths in Docker and compose files, default to quantized model for lower memory, and add aggressive GC and memory reporting in embedding_service. Refactor embedding provider and service URL logic for runtime flexibility and improved testability. Add remote embedding support and load balancing in hybrid/embed.py and ingest/qdrant.py. Update Neo4j status check to use HTTP API. Ensure remote embedding dimension probing in mcp_memory_server. These changes improve memory efficiency, remote deployment, and testability. --- docker-compose.yml | 8 +- embedding_service/Dockerfile | 12 ++- embedding_service/main.py | 93 +++++++++++++++++---- scripts/ctx_cli/commands/status.py | 83 +++++++++++------- scripts/embedder.py | 7 +- scripts/hybrid/embed.py | 130 +++++++++++++++++++---------- scripts/ingest/qdrant.py | 30 ++++--- scripts/mcp_memory_server.py | 21 ++++- 8 files changed, 272 insertions(+), 112 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 9b983a63..e20e6521 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,11 +48,11 @@ services: # Limit OMP/MKL threads to match ONNX_THREADS - OMP_NUM_THREADS=${ONNX_THREADS:-4} - MKL_NUM_THREADS=${ONNX_THREADS:-4} - # Cache paths - - HF_HOME=/tmp/huggingface - - FASTEMBED_CACHE_PATH=/tmp/fastembed + # Cache paths - aligned with Dockerfile and volume mount + - HF_HOME=/app/models + - FASTEMBED_CACHE_PATH=/app/models volumes: - - embedding_cache:/tmp/huggingface + - embedding_cache:/app/models healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8100/health')"] interval: 30s diff --git a/embedding_service/Dockerfile b/embedding_service/Dockerfile index 44ce064c..5801ed23 100644 --- a/embedding_service/Dockerfile +++ b/embedding_service/Dockerfile @@ -10,9 +10,15 @@ RUN pip install --no-cache-dir -r requirements.txt COPY main.py . # Pre-download embedding model during build for faster startup -# Default: nomic (faster + better quality than BGE) -# Override at build time: --build-arg EMBEDDING_MODEL=BAAI/bge-base-en-v1.5 -ARG EMBEDDING_MODEL=nomic-ai/nomic-embed-text-v1.5 +# Default: nomic-Q (quantized, 4x smaller, same quality) +# Override at build time: --build-arg EMBEDDING_MODEL=nomic-ai/nomic-embed-text-v1.5 +ARG EMBEDDING_MODEL=nomic-ai/nomic-embed-text-v1.5-Q + +# Set cache path BEFORE download so runtime uses same path +ENV FASTEMBED_CACHE_PATH=/app/models +ENV HF_HOME=/app/models + +# Download model to the same path used at runtime RUN python -c "from fastembed import TextEmbedding; TextEmbedding('${EMBEDDING_MODEL}')" ENV EMBED_SERVICE_PORT=8100 diff --git a/embedding_service/main.py b/embedding_service/main.py index fdce6ad0..a59021a4 100644 --- a/embedding_service/main.py +++ b/embedding_service/main.py @@ -6,20 +6,26 @@ POST /embed - Embed texts, returns vectors GET /health - Health check for k8s probes -Memory: ~1-2 GB (single ONNX model) +Memory: ~3-4 GB (single ONNX model + inference overhead) Concurrency: Controlled via EMBED_MAX_CONCURRENT semaphore ONNX CPU Optimizations: - ONNX_THREADS: Number of threads for intra-op parallelism (default: 0 = auto) + ONNX_THREADS: Number of threads for intra-op parallelism (default: 4) ONNX_DISABLE_SPINNING: Set to 1 to disable thread spinning (saves CPU cycles) - EMBED_OPTIMAL_BATCH: Internal batch size for chunking large requests (default: 32) + EMBED_OPTIMAL_BATCH: Internal batch size for chunking large requests (default: 16) + +Memory Optimizations: + EMBED_MAX_CONCURRENT: Max parallel inferences (default: 1 for memory safety) + EMBED_GC_INTERVAL: Force GC every N batches (default: 1 = every batch) + ONNX_ARENA_EXTEND_STRATEGY: Memory arena growth strategy (default: kSameAsRequested) Model Options (via EMBEDDING_MODEL env var): BAAI/bge-base-en-v1.5 - Default, solid quality (768 dim, 0.21 GB) - nomic-ai/nomic-embed-text-v1.5-Q - Quantized, faster, outperforms BGE on MTEB (768 dim, 0.13 GB) + nomic-ai/nomic-embed-text-v1.5 - Faster, outperforms BGE on MTEB (768 dim, ~0.5 GB) BAAI/bge-large-en-v1.5 - Higher quality, slower (1024 dim, 0.67 GB) """ import asyncio +import gc import logging import os import threading @@ -29,20 +35,41 @@ from fastapi import FastAPI, HTTPException from pydantic import BaseModel +# Memory optimization: limit numpy/MKL thread spawning +os.environ.setdefault("OMP_NUM_THREADS", "4") +os.environ.setdefault("MKL_NUM_THREADS", "4") +os.environ.setdefault("OPENBLAS_NUM_THREADS", "4") + +# ONNX Runtime memory optimization - prevent arena from growing unbounded +os.environ.setdefault("ORT_ARENA_EXTEND_STRATEGY", "kSameAsRequested") + +# Apply ONNX_DISABLE_SPINNING via OMP_WAIT_POLICY +# PASSIVE = threads sleep when idle (saves CPU), ACTIVE = threads spin (faster but burns CPU) +if os.environ.get("ONNX_DISABLE_SPINNING", "1").strip().lower() in {"1", "true", "yes"}: + os.environ.setdefault("OMP_WAIT_POLICY", "PASSIVE") +else: + os.environ.setdefault("OMP_WAIT_POLICY", "ACTIVE") + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- -# Configuration +# Memory-Optimized Configuration (target: <6GB per replica, FAST) # --------------------------------------------------------------------------- -MODEL_NAME = os.environ.get("EMBEDDING_MODEL", "BAAI/bge-base-en-v1.5") +# Use quantized model by default - 4x smaller (0.13GB vs 0.52GB), same quality +MODEL_NAME = os.environ.get("EMBEDDING_MODEL", "nomic-ai/nomic-embed-text-v1.5-Q") +# Allow 2 concurrent for speed - quantized model uses less memory per inference MAX_CONCURRENT = max(1, int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2)) -MAX_BATCH_SIZE = int(os.environ.get("EMBED_MAX_BATCH", "256") or 256) +MAX_BATCH_SIZE = int(os.environ.get("EMBED_MAX_BATCH", "128") or 128) # ONNX runtime optimizations -ONNX_THREADS = int(os.environ.get("ONNX_THREADS", "0") or 0) # 0 = auto (1 per physical core) -ONNX_DISABLE_SPINNING = os.environ.get("ONNX_DISABLE_SPINNING", "0").strip().lower() in {"1", "true", "yes"} -EMBED_OPTIMAL_BATCH = int(os.environ.get("EMBED_OPTIMAL_BATCH", "32") or 32) # Sweet spot for CPU +ONNX_THREADS = int(os.environ.get("ONNX_THREADS", "4") or 4) +ONNX_DISABLE_SPINNING = os.environ.get("ONNX_DISABLE_SPINNING", "1").strip().lower() in {"1", "true", "yes"} +# Batch size 32 is fine with quantized model +EMBED_OPTIMAL_BATCH = int(os.environ.get("EMBED_OPTIMAL_BATCH", "32") or 32) +# GC after every batch to prevent memory creep +GC_INTERVAL = int(os.environ.get("EMBED_GC_INTERVAL", "1") or 1) +_gc_counter = 0 # Global model and semaphore _model = None @@ -126,30 +153,60 @@ async def embed(request: EmbedRequest): def _embed_sync(texts: List[str]) -> List[List[float]]: - """Synchronous embedding with semaphore and optimal batching. + """Synchronous embedding with semaphore, batching, and aggressive GC. - Chunks large requests into EMBED_OPTIMAL_BATCH sized pieces for better - CPU cache utilization and memory efficiency. + Memory optimizations: + - Chunks into small batches (16) to reduce peak memory + - Forces GC after each batch to free tokenizer buffers immediately + - Single semaphore prevents concurrent memory spikes """ + global _gc_counter with _semaphore: - # Chunk into optimal batch sizes for CPU efficiency + # Small batch for memory efficiency if len(texts) <= EMBED_OPTIMAL_BATCH: - return [vec.tolist() for vec in _model.embed(texts)] - - # Process in chunks + result = [vec.tolist() for vec in _model.embed(texts)] + # Aggressive GC to free tokenizer/intermediate buffers + _gc_counter += 1 + if GC_INTERVAL > 0 and _gc_counter >= GC_INTERVAL: + gc.collect() + _gc_counter = 0 + return result + + # Process in small chunks with GC between all_vectors = [] for i in range(0, len(texts), EMBED_OPTIMAL_BATCH): chunk = texts[i:i + EMBED_OPTIMAL_BATCH] chunk_vectors = [vec.tolist() for vec in _model.embed(chunk)] all_vectors.extend(chunk_vectors) + # GC after each chunk to prevent memory buildup + _gc_counter += 1 + if GC_INTERVAL > 0 and _gc_counter >= GC_INTERVAL: + gc.collect() + _gc_counter = 0 return all_vectors +def _get_memory_mb() -> float: + """Get current process memory usage in MB.""" + try: + import resource + # RSS in bytes on macOS/Linux + rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + # macOS returns bytes, Linux returns KB + import sys + if sys.platform == "darwin": + return rss / (1024 * 1024) + return rss / 1024 + except Exception: + return -1 + + @app.get("/health") async def health(): - """Health check for k8s probes. Returns full model config.""" + """Health check for k8s probes. Returns full model config + memory.""" return { "status": "ok", + "memory_mb": round(_get_memory_mb(), 1), **_model_info, } diff --git a/scripts/ctx_cli/commands/status.py b/scripts/ctx_cli/commands/status.py index dd22dc59..1585d525 100644 --- a/scripts/ctx_cli/commands/status.py +++ b/scripts/ctx_cli/commands/status.py @@ -385,42 +385,67 @@ def get_graph_status(collection_name: str) -> Tuple[bool, Optional[Dict[str, Any logger.debug(f"Suppressed exception, continuing: {e}") continue # Try next URL - # Check Neo4j - always try to connect for status - # Use localhost (Docker exposes port) with common default password + # Check Neo4j via HTTP API (works without neo4j Python package) + # Neo4j exposes HTTP API on port 7474 try: - from neo4j import GraphDatabase - - uri = "bolt://localhost:7687" + import base64 user = os.environ.get("NEO4J_USER", "neo4j") password = os.environ.get("NEO4J_PASSWORD", "contextengine") + auth_str = base64.b64encode(f"{user}:{password}".encode()).decode() - driver = GraphDatabase.driver(uri, auth=(user, password)) - driver.verify_connectivity() - - with driver.session() as session: - node_result = session.run("MATCH (n:Symbol) RETURN count(n) as count") - node_count = node_result.single()["count"] - - edge_result = session.run("MATCH ()-[r]->() RETURN count(r) as count") - edge_count = edge_result.single()["count"] - - graph_info["neo4j"] = { - "connected": True, - "node_count": node_count, - "edge_count": edge_count, + # Query node count + node_query = {"statements": [{"statement": "MATCH (n:Symbol) RETURN count(n) as count"}]} + req = Request( + "http://localhost:7474/db/neo4j/tx/commit", + data=json.dumps(node_query).encode('utf-8'), + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Basic {auth_str}" } + ) + with urlopen(req, timeout=3) as response: + if response.status == 200: + data = json.loads(response.read().decode('utf-8')) + results = data.get("results", []) + if results and results[0].get("data"): + node_count = results[0]["data"][0]["row"][0] + else: + node_count = 0 + + # Query edge count + edge_query = {"statements": [{"statement": "MATCH ()-[r]->() RETURN count(r) as count"}]} + req = Request( + "http://localhost:7474/db/neo4j/tx/commit", + data=json.dumps(edge_query).encode('utf-8'), + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Basic {auth_str}" + } + ) + with urlopen(req, timeout=3) as response2: + if response2.status == 200: + data2 = json.loads(response2.read().decode('utf-8')) + results2 = data2.get("results", []) + if results2 and results2[0].get("data"): + edge_count = results2[0]["data"][0]["row"][0] + else: + edge_count = 0 + + graph_info["neo4j"] = { + "connected": True, + "node_count": node_count, + "edge_count": edge_count, + } - if graph_info["backend"] == "qdrant": - graph_info["backend"] = "both" - else: - graph_info["backend"] = "neo4j" - success = True - - driver.close() - except ImportError: - pass # neo4j package not installed + if graph_info["backend"] == "qdrant": + graph_info["backend"] = "both" + else: + graph_info["backend"] = "neo4j" + success = True except Exception as e: - logger.debug(f"Suppressed exception: {e}") # Neo4j not available + logger.debug(f"Neo4j HTTP check failed: {e}") # Neo4j not available return success, graph_info if success else None diff --git a/scripts/embedder.py b/scripts/embedder.py index 4f139f02..04ea6830 100644 --- a/scripts/embedder.py +++ b/scripts/embedder.py @@ -76,8 +76,9 @@ _ARCTIC_V2_REGISTERED = False _ARCTIC_V2_REGISTER_LOCK = threading.Lock() -# Remote embedding provider detection -_EMBEDDING_PROVIDER = os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() +def _get_embedding_provider() -> str: + """Get embedding provider at call time (not import time) for test flexibility.""" + return os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() class RemoteEmbeddingStub: @@ -179,7 +180,7 @@ def get_embedding_model(model_name: Optional[str] = None) -> Any: # Remote mode: return lightweight stub (no ONNX loaded) # This saves ~3-4 GB RAM per indexer since embed_batch() routes to remote service - if _EMBEDDING_PROVIDER == "remote": + if _get_embedding_provider() == "remote": cached = _EMBED_MODEL_CACHE.get(f"remote:{model_name}") if cached is not None: return cached diff --git a/scripts/hybrid/embed.py b/scripts/hybrid/embed.py index aa60d42c..3e1c8ae4 100644 --- a/scripts/hybrid/embed.py +++ b/scripts/hybrid/embed.py @@ -32,11 +32,12 @@ # Embedder factory setup # --------------------------------------------------------------------------- try: - from scripts.embedder import get_embedding_model as _get_embedding_model + from scripts.embedder import get_embedding_model as _get_embedding_model, RemoteEmbeddingStub _EMBEDDER_FACTORY = True except ImportError: _EMBEDDER_FACTORY = False _get_embedding_model = None # type: ignore + RemoteEmbeddingStub = None # type: ignore # Always try to import TextEmbedding for backward compatibility with tests try: @@ -44,6 +45,14 @@ except ImportError: TextEmbedding = None # type: ignore +# Remote embedding support: use embed_batch() when EMBEDDING_PROVIDER=remote +try: + from scripts.ingest.qdrant import embed_batch as _embed_batch_remote + _REMOTE_EMBED_AVAILABLE = True +except ImportError: + _embed_batch_remote = None # type: ignore + _REMOTE_EMBED_AVAILABLE = False + # Type alias for embedding model (TextEmbedding or compatible) EmbeddingModel = Any if TextEmbedding is None else TextEmbedding @@ -231,24 +240,38 @@ def _embed_with_unified_cache( # Batch-embed all missing queries in one call # Use query_embed if available AND ASYMMETRIC_EMBEDDING=1 (e.g., Jina v3) if missing_queries: - # Select embedding function: query_embed for asymmetric models, else regular embed - if ASYMMETRIC_EMBEDDING and hasattr(model, "query_embed"): - embed_fn = model.query_embed - else: - embed_fn = model.embed + # Check if model is RemoteEmbeddingStub - route to remote service + is_remote_stub = RemoteEmbeddingStub is not None and isinstance(model, RemoteEmbeddingStub) - try: - vecs = list(embed_fn(missing_queries)) - # Cache all new embeddings - for q, vec in zip(missing_queries, vecs): - key = (str(model_name), str(q)) - cache.set(key, vec.tolist()) - except Exception: - # Fallback to one-by-one if batch fails - for q in missing_queries: - key = (str(model_name), str(q)) - vec = next(embed_fn([q])).tolist() - cache.set(key, vec) + if is_remote_stub and _REMOTE_EMBED_AVAILABLE and _embed_batch_remote is not None: + # Use remote embedding service via embed_batch() + try: + vecs = _embed_batch_remote(model, missing_queries) + for q, vec in zip(missing_queries, vecs): + key = (str(model_name), str(q)) + cache.set(key, vec if isinstance(vec, list) else vec.tolist()) + except Exception as e: + logger.warning(f"Remote embedding failed: {e}") + raise + else: + # Local embedding: select function based on asymmetric mode + if ASYMMETRIC_EMBEDDING and hasattr(model, "query_embed"): + embed_fn = model.query_embed + else: + embed_fn = model.embed + + try: + vecs = list(embed_fn(missing_queries)) + # Cache all new embeddings + for q, vec in zip(missing_queries, vecs): + key = (str(model_name), str(q)) + cache.set(key, vec.tolist()) + except Exception: + # Fallback to one-by-one if batch fails + for q in missing_queries: + key = (str(model_name), str(q)) + vec = next(embed_fn([q])).tolist() + cache.set(key, vec) # Return embeddings in original order from cache out: List[List[float]] = [] @@ -280,34 +303,53 @@ def _embed_with_legacy_cache( # Batch-embed all missing queries in one call # Use query_embed if available AND ASYMMETRIC_EMBEDDING=1 (e.g., Jina v3) if missing_queries: - if ASYMMETRIC_EMBEDDING and hasattr(model, "query_embed"): - embed_fn = model.query_embed - else: - embed_fn = model.embed + # Check if model is RemoteEmbeddingStub - route to remote service + is_remote_stub = RemoteEmbeddingStub is not None and isinstance(model, RemoteEmbeddingStub) - try: - # Embed all missing queries at once - vecs = list(embed_fn(missing_queries)) - with _EMBED_LOCK: - # Cache all new embeddings - for q, vec in zip(missing_queries, vecs): - key = (str(model_name), str(q)) - if key not in _EMBED_QUERY_CACHE: - _EMBED_QUERY_CACHE[key] = vec.tolist() - # Evict oldest entries if cache exceeds limit - while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: - _EMBED_QUERY_CACHE.popitem(last=False) - except Exception: - # Fallback to one-by-one if batch fails - for q in missing_queries: - key = (str(model_name), str(q)) - vec = next(embed_fn([q])).tolist() + if is_remote_stub and _REMOTE_EMBED_AVAILABLE and _embed_batch_remote is not None: + # Use remote embedding service via embed_batch() + try: + vecs = _embed_batch_remote(model, missing_queries) with _EMBED_LOCK: - if key not in _EMBED_QUERY_CACHE: - _EMBED_QUERY_CACHE[key] = vec - # Evict oldest entries if cache exceeds limit - while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: - _EMBED_QUERY_CACHE.popitem(last=False) + for q, vec in zip(missing_queries, vecs): + key = (str(model_name), str(q)) + if key not in _EMBED_QUERY_CACHE: + _EMBED_QUERY_CACHE[key] = vec if isinstance(vec, list) else vec.tolist() + while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: + _EMBED_QUERY_CACHE.popitem(last=False) + except Exception as e: + logger.warning(f"Remote embedding failed: {e}") + raise + else: + # Local embedding + if ASYMMETRIC_EMBEDDING and hasattr(model, "query_embed"): + embed_fn = model.query_embed + else: + embed_fn = model.embed + + try: + # Embed all missing queries at once + vecs = list(embed_fn(missing_queries)) + with _EMBED_LOCK: + # Cache all new embeddings + for q, vec in zip(missing_queries, vecs): + key = (str(model_name), str(q)) + if key not in _EMBED_QUERY_CACHE: + _EMBED_QUERY_CACHE[key] = vec.tolist() + # Evict oldest entries if cache exceeds limit + while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: + _EMBED_QUERY_CACHE.popitem(last=False) + except Exception: + # Fallback to one-by-one if batch fails + for q in missing_queries: + key = (str(model_name), str(q)) + vec = next(embed_fn([q])).tolist() + with _EMBED_LOCK: + if key not in _EMBED_QUERY_CACHE: + _EMBED_QUERY_CACHE[key] = vec + # Evict oldest entries if cache exceeds limit + while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: + _EMBED_QUERY_CACHE.popitem(last=False) # Return embeddings in original order from cache (thread-safe read) out: List[List[float]] = [] diff --git a/scripts/ingest/qdrant.py b/scripts/ingest/qdrant.py index 79fbac6d..4ad4515d 100644 --- a/scripts/ingest/qdrant.py +++ b/scripts/ingest/qdrant.py @@ -24,15 +24,20 @@ _EMBED_MAX_CONCURRENT = max(1, int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2)) _EMBED_SEMAPHORE = threading.Semaphore(_EMBED_MAX_CONCURRENT) -# Remote embedding service configuration -_EMBEDDING_PROVIDER = os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() -_EMBEDDING_SERVICE_URL = os.environ.get("EMBEDDING_SERVICE_URL", "http://embedding:8100") +# Remote embedding service configuration - functions for runtime flexibility (tests can change env) +def _get_embedding_provider() -> str: + return os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() + +def _get_embedding_service_url() -> str: + return os.environ.get("EMBEDDING_SERVICE_URL", "http://embedding:8100") + +def _get_embedding_service_urls() -> list: + raw = os.environ.get("EMBEDDING_SERVICE_URLS", "").strip() + return [u.strip() for u in raw.split(",") if u.strip()] if raw else [] + _EMBEDDING_SERVICE_TIMEOUT = int(os.environ.get("EMBEDDING_SERVICE_TIMEOUT", "60") or 60) # Client-side load balancing for multiple replicas (Compose mode) -# Set EMBEDDING_SERVICE_URLS=http://host1:8100,http://host2:8100 for parallel processing -_EMBEDDING_SERVICE_URLS_RAW = os.environ.get("EMBEDDING_SERVICE_URLS", "").strip() -_EMBEDDING_SERVICE_URLS = [u.strip() for u in _EMBEDDING_SERVICE_URLS_RAW.split(",") if u.strip()] if _EMBEDDING_SERVICE_URLS_RAW else [] _EMBED_REPLICA_INDEX = 0 _EMBED_REPLICA_LOCK = threading.Lock() @@ -1022,13 +1027,17 @@ def _embed_remote(texts: List[str], model_name: str = "default") -> List[List[fl global _EMBED_REPLICA_INDEX import requests + # Get URLs at call time (not import time) for test flexibility + service_urls = _get_embedding_service_urls() + service_url = _get_embedding_service_url() + # Client-side load balancing across replicas - if _EMBEDDING_SERVICE_URLS: + if service_urls: with _EMBED_REPLICA_LOCK: - url = _EMBEDDING_SERVICE_URLS[_EMBED_REPLICA_INDEX % len(_EMBEDDING_SERVICE_URLS)] + url = service_urls[_EMBED_REPLICA_INDEX % len(service_urls)] _EMBED_REPLICA_INDEX += 1 else: - url = _EMBEDDING_SERVICE_URL + url = service_url try: resp = requests.post( @@ -1055,7 +1064,8 @@ def embed_batch(model, texts: List[str]) -> List[List[float]]: if not texts: return [] - if _EMBEDDING_PROVIDER == "remote": + # Check provider at call time (not import time) for test flexibility + if _get_embedding_provider() == "remote": # Get model name for remote service model_name = getattr(model, "model_name", None) or os.environ.get("EMBEDDING_MODEL", "default") return _embed_remote(texts, model_name) diff --git a/scripts/mcp_memory_server.py b/scripts/mcp_memory_server.py index 61880669..6b01d804 100644 --- a/scripts/mcp_memory_server.py +++ b/scripts/mcp_memory_server.py @@ -429,7 +429,10 @@ def _ensure_collection(name: str): _return_qdrant_client(client) # Choose dense dimension based on config: probe (default) vs env-configured - if MEMORY_PROBE_EMBED_DIM: + # When EMBEDDING_PROVIDER=remote, skip local model loading and use env dimension or probe via remote + _embedding_provider = os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() + + if MEMORY_PROBE_EMBED_DIM and _embedding_provider != "remote": try: # Probe dimension without populating the shared model cache. # This preserves the "cache loads on first tool call" behavior and @@ -450,6 +453,22 @@ def _ensure_collection(name: str): dense_dim = int(os.environ.get("MEMORY_VECTOR_DIM") or os.environ.get("EMBED_DIM") or "768") except Exception: dense_dim = 768 + elif MEMORY_PROBE_EMBED_DIM and _embedding_provider == "remote": + # Remote mode: probe via remote embedding service instead of loading model locally + try: + import requests + _embed_url = os.environ.get("EMBEDDING_SERVICE_URL", "http://embedding:8100") + resp = requests.post(f"{_embed_url}/embed", json={"texts": ["probe"]}, timeout=30) + resp.raise_for_status() + vectors = resp.json().get("vectors", []) + dense_dim = len(vectors[0]) if vectors else int(os.environ.get("MEMORY_VECTOR_DIM") or os.environ.get("EMBED_DIM") or "768") + logger.info(f"Probed embedding dimension via remote service: {dense_dim}") + except Exception as e: + logger.warning(f"Remote embedding probe failed, using env dimension: {e}") + try: + dense_dim = int(os.environ.get("MEMORY_VECTOR_DIM") or os.environ.get("EMBED_DIM") or "768") + except Exception: + dense_dim = 768 else: dense_dim = int(MEMORY_VECTOR_DIM or 768)