diff --git a/.env.example b/.env.example index 30ebc3eb..dbe27f3c 100644 --- a/.env.example +++ b/.env.example @@ -20,10 +20,51 @@ COLLECTION_NAME=codebase # Embeddings EMBEDDING_MODEL=BAAI/bge-base-en-v1.5 -EMBEDDING_PROVIDER=fastembed # Optional repo tag attached to each payload REPO_NAME=workspace +# --------------------------------------------------------------------------- +# Embedding Service Configuration (Shared ONNX for scale) +# --------------------------------------------------------------------------- +# EMBEDDING_PROVIDER: local | remote +# local = Use in-process ONNX (default, high memory per worker) +# remote = Use shared embedding service (recommended for scale) +EMBEDDING_PROVIDER=local + +# When EMBEDDING_PROVIDER=remote, calls this service +EMBEDDING_SERVICE_URL=http://embedding:8100 +EMBEDDING_SERVICE_TIMEOUT=60 + +# Max concurrent ONNX inferences (local mode or in embedding service) +# Prevents memory explosion with parallel workers +EMBED_MAX_CONCURRENT=2 + +# Max batch size per embed request +EMBED_MAX_BATCH=256 + +# --------------------------------------------------------------------------- +# ONNX CPU Optimizations (for embedding service) +# --------------------------------------------------------------------------- +# ONNX_THREADS: Number of threads for intra-op parallelism +# 0 = auto (1 per physical core), or set explicit count (e.g., 4-6) +ONNX_THREADS=0 + +# ONNX_DISABLE_SPINNING: Disable thread spin-wait (saves CPU cycles) +# 0 = spinning enabled (faster, burns CPU), 1 = disabled (power efficient) +ONNX_DISABLE_SPINNING=0 + +# EMBED_OPTIMAL_BATCH: Internal batch size for chunking large requests +# Sweet spot for CPU is 32-64. Too small = overhead, too large = memory pressure +EMBED_OPTIMAL_BATCH=32 + +# --------------------------------------------------------------------------- +# Embedding Model Options +# --------------------------------------------------------------------------- +# Model options (changing model requires re-indexing!): +# BAAI/bge-base-en-v1.5 - Default, solid quality (768 dim, 0.21 GB) +# nomic-ai/nomic-embed-text-v1.5 - Faster, outperforms BGE on MTEB (768 dim, 0.13 GB) +# BAAI/bge-large-en-v1.5 - Higher quality, slower (1024 dim, 0.67 GB) +# # Qwen3-Embedding Feature Flag (optional, experimental) # Enable to use Qwen3-Embedding-0.6B instead of BGE-base (requires reindex) # QWEN3_EMBEDDING_ENABLED=0 diff --git a/deploy/helm/context-engine/templates/configmap.yaml b/deploy/helm/context-engine/templates/configmap.yaml index 2a0f1f85..3e472967 100644 --- a/deploy/helm/context-engine/templates/configmap.yaml +++ b/deploy/helm/context-engine/templates/configmap.yaml @@ -11,7 +11,9 @@ data: QDRANT_URL: {{ include "context-engine.qdrantUrl" . | quote }} EMBEDDING_MODEL: {{ .Values.config.embeddingModel | quote }} EMBEDDING_PROVIDER: {{ .Values.config.embeddingProvider | quote }} + EMBEDDING_SERVICE_URL: {{ .Values.config.embeddingServiceUrl | quote }} EMBEDDING_WARMUP: "0" + INDEX_WORKERS: {{ .Values.config.indexWorkers | default "4" | quote }} FASTMCP_HOST: {{ .Values.config.fastmcp.host | quote }} FASTMCP_PORT: {{ .Values.config.fastmcp.port | quote }} diff --git a/deploy/helm/context-engine/templates/embedding-service.yaml b/deploy/helm/context-engine/templates/embedding-service.yaml new file mode 100644 index 00000000..7a7dee39 --- /dev/null +++ b/deploy/helm/context-engine/templates/embedding-service.yaml @@ -0,0 +1,91 @@ +{{- if eq .Values.config.embeddingProvider "remote" }} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "context-engine.fullname" . }}-embedding + labels: + {{- include "context-engine.labels" . | nindent 4 }} + app.kubernetes.io/component: embedding +spec: + replicas: {{ .Values.embedding.replicas | default 2 }} + selector: + matchLabels: + {{- include "context-engine.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: embedding + template: + metadata: + labels: + {{- include "context-engine.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: embedding + spec: + containers: + - name: embedding + image: "{{ .Values.embedding.image.repository | default "context-engine-embedding" }}:{{ .Values.embedding.image.tag | default "latest" }}" + ports: + - containerPort: 8100 + env: + - name: EMBEDDING_MODEL + value: {{ .Values.config.embeddingModel | quote }} + - name: EMBED_MAX_CONCURRENT + value: {{ .Values.embedding.maxConcurrent | default "2" | quote }} + - name: EMBED_OPTIMAL_BATCH + value: {{ .Values.embedding.optimalBatch | default "32" | quote }} + - name: ONNX_THREADS + value: {{ .Values.embedding.onnxThreads | default "4" | quote }} + - name: ONNX_DISABLE_SPINNING + value: "1" + - name: OMP_NUM_THREADS + value: {{ .Values.embedding.onnxThreads | default "4" | quote }} + - name: MKL_NUM_THREADS + value: {{ .Values.embedding.onnxThreads | default "4" | quote }} + resources: + {{- toYaml .Values.embedding.resources | nindent 12 }} + readinessProbe: + httpGet: + path: /health + port: 8100 + initialDelaySeconds: 30 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /health + port: 8100 + initialDelaySeconds: 60 + periodSeconds: 30 +--- +apiVersion: v1 +kind: Service +metadata: + name: embedding + labels: + {{- include "context-engine.labels" . | nindent 4 }} +spec: + selector: + {{- include "context-engine.selectorLabels" . | nindent 4 }} + app.kubernetes.io/component: embedding + ports: + - port: 8100 + targetPort: 8100 +--- +{{- if .Values.embedding.autoscaling.enabled }} +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: {{ include "context-engine.fullname" . }}-embedding +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: {{ include "context-engine.fullname" . }}-embedding + minReplicas: {{ .Values.embedding.autoscaling.minReplicas | default 2 }} + maxReplicas: {{ .Values.embedding.autoscaling.maxReplicas | default 10 }} + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: {{ .Values.embedding.autoscaling.targetCPU | default 70 }} +{{- end }} +{{- end }} + diff --git a/deploy/helm/context-engine/values.yaml b/deploy/helm/context-engine/values.yaml index bdfc11be..7010c812 100644 --- a/deploy/helm/context-engine/values.yaml +++ b/deploy/helm/context-engine/values.yaml @@ -110,6 +110,37 @@ qdrant: initialDelaySeconds: 5 periodSeconds: 5 +# ----------------------------------------------------------------------------- +# Embedding Service Configuration (shared ONNX model) +# ----------------------------------------------------------------------------- +embedding: + # -- Number of replicas + replicas: 2 + # -- Image configuration + image: + repository: context-engine-embedding + tag: latest + # -- Max concurrent embeddings per replica + maxConcurrent: 2 + # -- Optimal batch size for CPU cache + optimalBatch: 32 + # -- ONNX threads per replica + onnxThreads: 4 + # -- Resource requests and limits + resources: + requests: + cpu: "2" + memory: 4Gi + limits: + cpu: "4" + memory: 6Gi + # -- Autoscaling configuration + autoscaling: + enabled: true + minReplicas: 2 + maxReplicas: 10 + targetCPU: 70 + # ----------------------------------------------------------------------------- # MCP Indexer HTTP Configuration # ----------------------------------------------------------------------------- @@ -463,10 +494,14 @@ config: # -- Qdrant URL (auto-generated if not set) qdrantUrl: "" # -- Embedding model - embeddingModel: BAAI/bge-base-en-v1.5 - # -- Embedding provider - embeddingProvider: fastembed - + embeddingModel: nomic-ai/nomic-embed-text-v1.5 + # -- Embedding provider (remote = shared service, fastembed = local) + embeddingProvider: remote + # -- Embedding service URL (when provider=remote) + embeddingServiceUrl: http://embedding:8100 + # -- Index workers (parallel file processing) + indexWorkers: 4 + # -- FastMCP settings fastmcp: host: "0.0.0.0" diff --git a/deploy/kubernetes/configmap.yaml b/deploy/kubernetes/configmap.yaml index caf3e4c3..9d55fc2c 100644 --- a/deploy/kubernetes/configmap.yaml +++ b/deploy/kubernetes/configmap.yaml @@ -15,8 +15,10 @@ data: CTX_SUMMARY_CHARS: '0' CURRENT_REPO: '' DECODER_MAX_TOKENS: '4000' - EMBEDDING_MODEL: BAAI/bge-base-en-v1.5 - EMBEDDING_PROVIDER: fastembed + EMBEDDING_MODEL: nomic-ai/nomic-embed-text-v1.5 + EMBEDDING_PROVIDER: remote + EMBEDDING_SERVICE_URL: http://embedding:8100 + INDEX_WORKERS: "4" EMBEDDING_WARMUP: '0' FASTMCP_HOST: 0.0.0.0 FASTMCP_HTTP_HEALTH_PORT: '18002' diff --git a/deploy/kubernetes/embedding-service.yaml b/deploy/kubernetes/embedding-service.yaml new file mode 100644 index 00000000..e82439e3 --- /dev/null +++ b/deploy/kubernetes/embedding-service.yaml @@ -0,0 +1,106 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: embedding-service + namespace: context-engine + labels: + app: embedding-service +spec: + replicas: 2 + selector: + matchLabels: + app: embedding-service + template: + metadata: + labels: + app: embedding-service + spec: + containers: + - name: embedding + image: context-engine-embedding:latest + ports: + - containerPort: 8100 + env: + - name: EMBEDDING_MODEL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_MODEL + - name: EMBED_MAX_CONCURRENT + value: "2" + - name: EMBED_MAX_BATCH + value: "256" + - name: EMBED_OPTIMAL_BATCH + value: "32" + - name: ONNX_THREADS + value: "4" + - name: ONNX_DISABLE_SPINNING + value: "1" + - name: OMP_NUM_THREADS + value: "4" + - name: MKL_NUM_THREADS + value: "4" + - name: HF_HOME + value: /tmp/huggingface + - name: FASTEMBED_CACHE_PATH + value: /tmp/fastembed + resources: + requests: + memory: "4Gi" + cpu: "2" + limits: + memory: "6Gi" + cpu: "4" + readinessProbe: + httpGet: + path: /health + port: 8100 + initialDelaySeconds: 30 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /health + port: 8100 + initialDelaySeconds: 60 + periodSeconds: 30 + volumeMounts: + - name: embedding-cache + mountPath: /tmp/huggingface + volumes: + - name: embedding-cache + emptyDir: + sizeLimit: 2Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: embedding + namespace: context-engine +spec: + selector: + app: embedding-service + ports: + - port: 8100 + targetPort: 8100 + type: ClusterIP +--- +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: embedding-service-hpa + namespace: context-engine +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: embedding-service + minReplicas: 2 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 + diff --git a/deploy/kubernetes/indexer-services.yaml b/deploy/kubernetes/indexer-services.yaml index 123582c6..26eeed17 100644 --- a/deploy/kubernetes/indexer-services.yaml +++ b/deploy/kubernetes/indexer-services.yaml @@ -71,6 +71,21 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_MODEL + - name: EMBEDDING_PROVIDER + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: HF_HOME value: /work/models/hf-cache - name: XDG_CACHE_HOME @@ -209,6 +224,21 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_MODEL + - name: EMBEDDING_PROVIDER + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: HF_HOME value: /work/models/hf-cache - name: XDG_CACHE_HOME diff --git a/deploy/kubernetes/kustomization.yaml b/deploy/kubernetes/kustomization.yaml index 233ff860..b10fd98b 100644 --- a/deploy/kubernetes/kustomization.yaml +++ b/deploy/kubernetes/kustomization.yaml @@ -13,6 +13,7 @@ resources: # Core services - qdrant.yaml - redis.yaml + - embedding-service.yaml - code-models-pvc.yaml - upload-pvc.yaml @@ -57,6 +58,9 @@ images: newTag: latest # newTag: v1.0.0 # newName: your-registry/context-engine + - name: context-engine-embedding + newTag: latest + # newName: your-registry/context-engine-embedding # Namespace override namespace: context-engine @@ -68,6 +72,8 @@ replicas: count: 1 # Set to 2+ for production - name: mcp-indexer count: 1 # Set to 2+ for production + - name: embedding-service + count: 2 # Shared embedding service replicas # Resource patches patches: diff --git a/deploy/kubernetes/learning-reranker-worker.yaml b/deploy/kubernetes/learning-reranker-worker.yaml index c1431c22..2148a783 100644 --- a/deploy/kubernetes/learning-reranker-worker.yaml +++ b/deploy/kubernetes/learning-reranker-worker.yaml @@ -56,6 +56,10 @@ spec: - name: metadata-volume mountPath: /tmp/rerank_events subPath: rerank_events + env: + # Learning worker needs local embeddings for reranker training + - name: EMBEDDING_PROVIDER + value: "local" envFrom: - configMapRef: name: context-engine-config diff --git a/deploy/kubernetes/mcp-http.yaml b/deploy/kubernetes/mcp-http.yaml index c8ace6ed..b6c2f5fb 100644 --- a/deploy/kubernetes/mcp-http.yaml +++ b/deploy/kubernetes/mcp-http.yaml @@ -73,6 +73,16 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: TOOL_STORE_DESCRIPTION valueFrom: configMapKeyRef: @@ -253,6 +263,21 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_MODEL + - name: EMBEDDING_PROVIDER + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: HF_HOME value: /work/models/hf-cache - name: XDG_CACHE_HOME diff --git a/deploy/kubernetes/mcp-indexer.yaml b/deploy/kubernetes/mcp-indexer.yaml index 2fbec1a1..26b51a2d 100644 --- a/deploy/kubernetes/mcp-indexer.yaml +++ b/deploy/kubernetes/mcp-indexer.yaml @@ -78,6 +78,21 @@ spec: configMapKeyRef: name: context-engine-config key: EMBEDDING_MODEL + - name: EMBEDDING_PROVIDER + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_PROVIDER + - name: EMBEDDING_SERVICE_URL + valueFrom: + configMapKeyRef: + name: context-engine-config + key: EMBEDDING_SERVICE_URL + - name: INDEX_WORKERS + valueFrom: + configMapKeyRef: + name: context-engine-config + key: INDEX_WORKERS - name: HF_HOME value: /work/models/hf-cache - name: XDG_CACHE_HOME diff --git a/docker-compose.yml b/docker-compose.yml index e3513da5..e20e6521 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,51 @@ services: networks: - dev-remote-network + # Shared Embedding Service - ONNX models serve all indexers + # Memory: 6GB × 2 replicas = 12GB total (load balanced) + # Enable with EMBEDDING_PROVIDER=remote in .env + embedding: + build: + context: ./embedding_service + dockerfile: Dockerfile + # No container_name - allows replicas + ports: + - "8100-8101:8100" + environment: + # Model: use nomic for faster + better quality, or override via EMBEDDING_MODEL + - EMBEDDING_MODEL=${EMBEDDING_MODEL:-nomic-ai/nomic-embed-text-v1.5} + - EMBED_MAX_CONCURRENT=${EMBED_MAX_CONCURRENT:-2} + - EMBED_MAX_BATCH=${EMBED_MAX_BATCH:-256} + # ONNX CPU optimizations + # ONNX_THREADS: limit threads per replica (0=all cores, 4-6 recommended for multi-replica) + - ONNX_THREADS=${ONNX_THREADS:-4} + # Disable thread spinning to reduce CPU burn when idle + - ONNX_DISABLE_SPINNING=${ONNX_DISABLE_SPINNING:-1} + - EMBED_OPTIMAL_BATCH=${EMBED_OPTIMAL_BATCH:-32} + # Limit OMP/MKL threads to match ONNX_THREADS + - OMP_NUM_THREADS=${ONNX_THREADS:-4} + - MKL_NUM_THREADS=${ONNX_THREADS:-4} + # Cache paths - aligned with Dockerfile and volume mount + - HF_HOME=/app/models + - FASTEMBED_CACHE_PATH=/app/models + volumes: + - embedding_cache:/app/models + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8100/health')"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + deploy: + replicas: 2 + resources: + limits: + memory: 6G + cpus: "4" + restart: unless-stopped + networks: + - dev-remote-network + # Qdrant vector database - same as base compose qdrant: image: qdrant/qdrant:latest @@ -227,7 +272,9 @@ services: - TRANSFORMERS_CACHE=/tmp/huggingface/transformers - FASTEMBED_CACHE_PATH=/tmp/huggingface/fastembed - EMBEDDING_MODEL=${EMBEDDING_MODEL} - - EMBEDDING_PROVIDER=${EMBEDDING_PROVIDER} + # Override: learning worker calls embedder.embed() directly via RecursiveReranker._encode_raw() + # which doesn't route through embed_batch(). Needs local ONNX, not RemoteEmbeddingStub. + - EMBEDDING_PROVIDER=local # Cross-encoder reranker (used as teacher for learning) - RERANKER_MODEL=${RERANKER_MODEL:-} - RERANKER_ONNX_PATH=${RERANKER_ONNX_PATH:-} @@ -744,6 +791,10 @@ volumes: redis_data: driver: local + # Embedding service model cache + embedding_cache: + driver: local + # Custom network for service discovery networks: dev-remote-network: diff --git a/embedding_service/Dockerfile b/embedding_service/Dockerfile new file mode 100644 index 00000000..5801ed23 --- /dev/null +++ b/embedding_service/Dockerfile @@ -0,0 +1,28 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy service +COPY main.py . + +# Pre-download embedding model during build for faster startup +# Default: nomic-Q (quantized, 4x smaller, same quality) +# Override at build time: --build-arg EMBEDDING_MODEL=nomic-ai/nomic-embed-text-v1.5 +ARG EMBEDDING_MODEL=nomic-ai/nomic-embed-text-v1.5-Q + +# Set cache path BEFORE download so runtime uses same path +ENV FASTEMBED_CACHE_PATH=/app/models +ENV HF_HOME=/app/models + +# Download model to the same path used at runtime +RUN python -c "from fastembed import TextEmbedding; TextEmbedding('${EMBEDDING_MODEL}')" + +ENV EMBED_SERVICE_PORT=8100 +EXPOSE 8100 + +CMD ["python", "main.py"] + diff --git a/embedding_service/main.py b/embedding_service/main.py new file mode 100644 index 00000000..a59021a4 --- /dev/null +++ b/embedding_service/main.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +""" +Shared Embedding Service - One model instance serves all indexers. + +Endpoints: + POST /embed - Embed texts, returns vectors + GET /health - Health check for k8s probes + +Memory: ~3-4 GB (single ONNX model + inference overhead) +Concurrency: Controlled via EMBED_MAX_CONCURRENT semaphore + +ONNX CPU Optimizations: + ONNX_THREADS: Number of threads for intra-op parallelism (default: 4) + ONNX_DISABLE_SPINNING: Set to 1 to disable thread spinning (saves CPU cycles) + EMBED_OPTIMAL_BATCH: Internal batch size for chunking large requests (default: 16) + +Memory Optimizations: + EMBED_MAX_CONCURRENT: Max parallel inferences (default: 1 for memory safety) + EMBED_GC_INTERVAL: Force GC every N batches (default: 1 = every batch) + ONNX_ARENA_EXTEND_STRATEGY: Memory arena growth strategy (default: kSameAsRequested) + +Model Options (via EMBEDDING_MODEL env var): + BAAI/bge-base-en-v1.5 - Default, solid quality (768 dim, 0.21 GB) + nomic-ai/nomic-embed-text-v1.5 - Faster, outperforms BGE on MTEB (768 dim, ~0.5 GB) + BAAI/bge-large-en-v1.5 - Higher quality, slower (1024 dim, 0.67 GB) +""" +import asyncio +import gc +import logging +import os +import threading +from contextlib import asynccontextmanager +from typing import List, Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +# Memory optimization: limit numpy/MKL thread spawning +os.environ.setdefault("OMP_NUM_THREADS", "4") +os.environ.setdefault("MKL_NUM_THREADS", "4") +os.environ.setdefault("OPENBLAS_NUM_THREADS", "4") + +# ONNX Runtime memory optimization - prevent arena from growing unbounded +os.environ.setdefault("ORT_ARENA_EXTEND_STRATEGY", "kSameAsRequested") + +# Apply ONNX_DISABLE_SPINNING via OMP_WAIT_POLICY +# PASSIVE = threads sleep when idle (saves CPU), ACTIVE = threads spin (faster but burns CPU) +if os.environ.get("ONNX_DISABLE_SPINNING", "1").strip().lower() in {"1", "true", "yes"}: + os.environ.setdefault("OMP_WAIT_POLICY", "PASSIVE") +else: + os.environ.setdefault("OMP_WAIT_POLICY", "ACTIVE") + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Memory-Optimized Configuration (target: <6GB per replica, FAST) +# --------------------------------------------------------------------------- +# Use quantized model by default - 4x smaller (0.13GB vs 0.52GB), same quality +MODEL_NAME = os.environ.get("EMBEDDING_MODEL", "nomic-ai/nomic-embed-text-v1.5-Q") +# Allow 2 concurrent for speed - quantized model uses less memory per inference +MAX_CONCURRENT = max(1, int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2)) +MAX_BATCH_SIZE = int(os.environ.get("EMBED_MAX_BATCH", "128") or 128) + +# ONNX runtime optimizations +ONNX_THREADS = int(os.environ.get("ONNX_THREADS", "4") or 4) +ONNX_DISABLE_SPINNING = os.environ.get("ONNX_DISABLE_SPINNING", "1").strip().lower() in {"1", "true", "yes"} +# Batch size 32 is fine with quantized model +EMBED_OPTIMAL_BATCH = int(os.environ.get("EMBED_OPTIMAL_BATCH", "32") or 32) +# GC after every batch to prevent memory creep +GC_INTERVAL = int(os.environ.get("EMBED_GC_INTERVAL", "1") or 1) +_gc_counter = 0 + +# Global model and semaphore +_model = None +_semaphore = threading.Semaphore(MAX_CONCURRENT) +_model_dim = None +_model_info = {} + + +def _load_model(): + """Load embedding model with ONNX optimizations.""" + global _model, _model_dim, _model_info + from fastembed import TextEmbedding + + logger.info(f"Loading model: {MODEL_NAME}") + logger.info(f"ONNX config: threads={ONNX_THREADS or 'auto'}, disable_spinning={ONNX_DISABLE_SPINNING}, optimal_batch={EMBED_OPTIMAL_BATCH}") + + # Build kwargs for TextEmbedding + # FastEmbed accepts 'threads' parameter for ONNX session + model_kwargs = {} + if ONNX_THREADS > 0: + model_kwargs["threads"] = ONNX_THREADS + + _model = TextEmbedding(model_name=MODEL_NAME, **model_kwargs) + + # Apply additional ONNX session options if we can access the session + # Note: FastEmbed may not expose all session options, but threads is the main one + + # Warmup and get dimension (also warms up ONNX runtime) + logger.info("Warming up model...") + warmup = list(_model.embed(["warmup query for initialization"])) + _model_dim = len(warmup[0]) + + # Store model info for health endpoint + _model_info = { + "model": MODEL_NAME, + "dim": _model_dim, + "threads": ONNX_THREADS or "auto", + "disable_spinning": ONNX_DISABLE_SPINNING, + "optimal_batch": EMBED_OPTIMAL_BATCH, + "max_concurrent": MAX_CONCURRENT, + } + + logger.info(f"Model loaded: dim={_model_dim}, max_concurrent={MAX_CONCURRENT}") + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Load model on startup.""" + _load_model() + yield + + +app = FastAPI(title="Embedding Service", lifespan=lifespan) + + +class EmbedRequest(BaseModel): + texts: List[str] + model: Optional[str] = None # Ignored for now, single model + + +class EmbedResponse(BaseModel): + vectors: List[List[float]] + dim: int + count: int + + +@app.post("/embed", response_model=EmbedResponse) +async def embed(request: EmbedRequest): + """Embed texts with concurrency control.""" + if not request.texts: + return EmbedResponse(vectors=[], dim=_model_dim or 768, count=0) + + if len(request.texts) > MAX_BATCH_SIZE: + raise HTTPException(400, f"Batch too large: {len(request.texts)} > {MAX_BATCH_SIZE}") + + # Run embedding in thread pool with semaphore + loop = asyncio.get_event_loop() + vectors = await loop.run_in_executor(None, _embed_sync, request.texts) + + return EmbedResponse(vectors=vectors, dim=_model_dim, count=len(vectors)) + + +def _embed_sync(texts: List[str]) -> List[List[float]]: + """Synchronous embedding with semaphore, batching, and aggressive GC. + + Memory optimizations: + - Chunks into small batches (16) to reduce peak memory + - Forces GC after each batch to free tokenizer buffers immediately + - Single semaphore prevents concurrent memory spikes + """ + global _gc_counter + with _semaphore: + # Small batch for memory efficiency + if len(texts) <= EMBED_OPTIMAL_BATCH: + result = [vec.tolist() for vec in _model.embed(texts)] + # Aggressive GC to free tokenizer/intermediate buffers + _gc_counter += 1 + if GC_INTERVAL > 0 and _gc_counter >= GC_INTERVAL: + gc.collect() + _gc_counter = 0 + return result + + # Process in small chunks with GC between + all_vectors = [] + for i in range(0, len(texts), EMBED_OPTIMAL_BATCH): + chunk = texts[i:i + EMBED_OPTIMAL_BATCH] + chunk_vectors = [vec.tolist() for vec in _model.embed(chunk)] + all_vectors.extend(chunk_vectors) + # GC after each chunk to prevent memory buildup + _gc_counter += 1 + if GC_INTERVAL > 0 and _gc_counter >= GC_INTERVAL: + gc.collect() + _gc_counter = 0 + return all_vectors + + +def _get_memory_mb() -> float: + """Get current process memory usage in MB.""" + try: + import resource + # RSS in bytes on macOS/Linux + rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + # macOS returns bytes, Linux returns KB + import sys + if sys.platform == "darwin": + return rss / (1024 * 1024) + return rss / 1024 + except Exception: + return -1 + + +@app.get("/health") +async def health(): + """Health check for k8s probes. Returns full model config + memory.""" + return { + "status": "ok", + "memory_mb": round(_get_memory_mb(), 1), + **_model_info, + } + + +if __name__ == "__main__": + import uvicorn + port = int(os.environ.get("EMBED_SERVICE_PORT", "8100")) + uvicorn.run(app, host="0.0.0.0", port=port) + diff --git a/embedding_service/requirements.txt b/embedding_service/requirements.txt new file mode 100644 index 00000000..efd79c41 --- /dev/null +++ b/embedding_service/requirements.txt @@ -0,0 +1,5 @@ +fastapi>=0.100.0 +uvicorn>=0.22.0 +fastembed>=0.2.0 +pydantic>=2.0.0 + diff --git a/requirements.txt b/requirements.txt index fd28cbab..d603d3c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,7 @@ orjson redis>=5.0.0 xxhash>=3.0.0 lz4>=4.0.0 +requests>=2.28.0 # Tree-sitter 0.25+ with individual language packages tree_sitter>=0.25.0 tree_sitter_python>=0.23.0 diff --git a/scripts/ctx_cli/commands/reset.py b/scripts/ctx_cli/commands/reset.py index c09e3483..f6f5b95f 100644 --- a/scripts/ctx_cli/commands/reset.py +++ b/scripts/ctx_cli/commands/reset.py @@ -53,7 +53,12 @@ def _load_env_to_environ(): # Default model/tokenizer URLs (same as Makefile) DEFAULT_MODEL_URL = "https://huggingface.co/ibm-granite/granite-4.0-micro-GGUF/resolve/main/granite-4.0-micro-Q4_K_M.gguf" DEFAULT_MODEL_PATH = "models/model.gguf" -DEFAULT_TOKENIZER_URL = "https://huggingface.co/BAAI/bge-base-en-v1.5/resolve/main/tokenizer.json" +# Tokenizer for micro-chunking (token counting). BGE tokenizer works for any model. +# Override via TOKENIZER_URL env var if needed. +DEFAULT_TOKENIZER_URL = os.environ.get( + "TOKENIZER_URL", + "https://huggingface.co/BAAI/bge-base-en-v1.5/resolve/main/tokenizer.json" +) DEFAULT_TOKENIZER_PATH = "models/tokenizer.json" @@ -116,6 +121,26 @@ def _wait_for_qdrant(url: str = "http://localhost:6333", timeout: int = 60) -> b return False +def _wait_for_embedding(url: str = "http://localhost:8100", timeout: int = 90) -> bool: + """Wait for embedding service to be ready.""" + _print(f"[dim]Waiting for embedding service at {url}...[/dim]") + start = time.time() + + while time.time() - start < timeout: + try: + health_url = f"{url.rstrip('/')}/health" + with urllib.request.urlopen(health_url, timeout=5) as r: + if getattr(r, "status", 200) < 500: + _print("[green]✓[/green] Embedding service is ready") + return True + except Exception as e: + logger.debug(f"Suppressed exception: {e}") + time.sleep(2) + + _print(f"[red]Error:[/red] Embedding service not ready after {timeout}s", error=True) + return False + + def _download_file(url: str, dest: Path, description: str) -> bool: """Download a file with progress display.""" _print(f"[dim]Downloading {description}...[/dim]") @@ -202,20 +227,21 @@ def reset( # Determine which containers to build/start based on mode # Default is HTTP-only; SSE only starts when explicitly requested with --sse + # Embedding service is always included (shared ONNX model for all indexers) if mode == "sse": # SSE MCPs only (legacy, must be explicitly requested) - build_containers = ["indexer", "mcp", "mcp_indexer", "watcher"] - start_containers = ["mcp", "mcp_indexer", "watcher"] + build_containers = ["embedding", "indexer", "mcp", "mcp_indexer", "watcher"] + start_containers = ["embedding", "mcp", "mcp_indexer", "watcher"] mode_desc = "SSE MCPs only (legacy)" elif mode == "dual": # Dual mode (both SSE and HTTP) - build_containers = ["indexer", "mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] - start_containers = ["mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + build_containers = ["embedding", "indexer", "mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + start_containers = ["embedding", "mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] mode_desc = "Dual mode (SSE + HTTP)" else: # HTTP MCPs only (default, Codex compatible) + upload_service for remote sync - build_containers = ["indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] - start_containers = ["mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + build_containers = ["embedding", "indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + start_containers = ["embedding", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] mode_desc = "HTTP MCPs (streamable)" # Add learning_worker if rerank learning is enabled @@ -259,7 +285,7 @@ def reset( step += 1 _print(f"\n[bold][{step}/{steps_total}] Stopping services...[/bold]") if db_reset: - # Full reset including database volumes (qdrant, redis, neo4j) + # Full reset including database volumes (qdrant, redis, neo4j, embedding cache) _run_cmd(compose_cmd + ["down", "-v", "--remove-orphans"], "Stopping all containers and removing volumes", check=False) _print("[green]✓[/green] Services stopped and database volumes removed") else: @@ -277,21 +303,28 @@ def reset( else: _print(f"\n[bold][{step}/{steps_total}] Skipping container build[/bold]") - # Step 3: Start Qdrant, Redis (if enabled), and Neo4j (if enabled) and wait + # Step 3: Start Qdrant, Redis (if enabled), Neo4j (if enabled), and Embedding service step += 1 db_services = ["qdrant"] if redis_enabled: db_services.append("redis") if neo4j_enabled: db_services.append("neo4j") + # Start embedding service early (indexer needs it) + db_services.append("embedding") _print(f"\n[bold][{step}/{steps_total}] Starting {', '.join(db_services)}...[/bold]") - _run_cmd(compose_cmd + ["up", "-d"] + db_services, f"Starting {', '.join(db_services)}") + # Use --scale for embedding to get 2 replicas (deploy.replicas is Swarm-only) + _run_cmd(compose_cmd + ["up", "-d", "--scale", "embedding=2"] + db_services, f"Starting {', '.join(db_services)} (embedding×2)") # Use helper that normalizes Docker hostname to localhost for host CLI qdrant_url = get_qdrant_url_for_host() if not _wait_for_qdrant(qdrant_url): return 1 + # Wait for embedding service to be ready (indexer needs it) + if not _wait_for_embedding("http://localhost:8100"): + _print("[yellow]Warning:[/yellow] Embedding service not ready, indexer may have errors") + # Step 4: Initialize payload indexes step += 1 _print(f"\n[bold][{step}/{steps_total}] Initializing payload indexes...[/bold]") @@ -406,8 +439,9 @@ def reset( _print(f"\n[bold][{step}/{steps_total}] Starting services...[/bold]") # Start services - cmd = compose_cmd + ["up", "-d"] + start_containers - _run_cmd(cmd, f"Starting: {', '.join(start_containers)}") + # Use --scale for embedding service to get multiple replicas (deploy.replicas is Swarm-only) + cmd = compose_cmd + ["up", "-d", "--scale", "embedding=2"] + start_containers + _run_cmd(cmd, f"Starting: {', '.join(start_containers)} (embedding×2)") _print("[green]✓[/green] Services started") _print_panel( @@ -448,7 +482,7 @@ def register_command(subparsers): ctx reset # Full reset with HTTP MCPs (default) ctx reset --dual # Both SSE and HTTP MCPs ctx reset --sse # SSE MCPs only (legacy) - ctx reset --db-reset # Reset database volumes (Qdrant, Redis, Neo4j) + ctx reset --db-reset # Reset database volumes (Qdrant, Redis, Neo4j, Embedding cache) ctx reset --skip-model # Skip llama model download ctx reset --skip-build # Skip container rebuild (faster) """ @@ -476,7 +510,7 @@ def register_command(subparsers): parser.add_argument( "--db-reset", action="store_true", - help="Reset database volumes (Qdrant, Redis, Neo4j)" + help="Reset database volumes (Qdrant, Redis, Neo4j, Embedding cache)" ) # Skip options diff --git a/scripts/ctx_cli/commands/status.py b/scripts/ctx_cli/commands/status.py index dd22dc59..1585d525 100644 --- a/scripts/ctx_cli/commands/status.py +++ b/scripts/ctx_cli/commands/status.py @@ -385,42 +385,67 @@ def get_graph_status(collection_name: str) -> Tuple[bool, Optional[Dict[str, Any logger.debug(f"Suppressed exception, continuing: {e}") continue # Try next URL - # Check Neo4j - always try to connect for status - # Use localhost (Docker exposes port) with common default password + # Check Neo4j via HTTP API (works without neo4j Python package) + # Neo4j exposes HTTP API on port 7474 try: - from neo4j import GraphDatabase - - uri = "bolt://localhost:7687" + import base64 user = os.environ.get("NEO4J_USER", "neo4j") password = os.environ.get("NEO4J_PASSWORD", "contextengine") + auth_str = base64.b64encode(f"{user}:{password}".encode()).decode() - driver = GraphDatabase.driver(uri, auth=(user, password)) - driver.verify_connectivity() - - with driver.session() as session: - node_result = session.run("MATCH (n:Symbol) RETURN count(n) as count") - node_count = node_result.single()["count"] - - edge_result = session.run("MATCH ()-[r]->() RETURN count(r) as count") - edge_count = edge_result.single()["count"] - - graph_info["neo4j"] = { - "connected": True, - "node_count": node_count, - "edge_count": edge_count, + # Query node count + node_query = {"statements": [{"statement": "MATCH (n:Symbol) RETURN count(n) as count"}]} + req = Request( + "http://localhost:7474/db/neo4j/tx/commit", + data=json.dumps(node_query).encode('utf-8'), + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Basic {auth_str}" } + ) + with urlopen(req, timeout=3) as response: + if response.status == 200: + data = json.loads(response.read().decode('utf-8')) + results = data.get("results", []) + if results and results[0].get("data"): + node_count = results[0]["data"][0]["row"][0] + else: + node_count = 0 + + # Query edge count + edge_query = {"statements": [{"statement": "MATCH ()-[r]->() RETURN count(r) as count"}]} + req = Request( + "http://localhost:7474/db/neo4j/tx/commit", + data=json.dumps(edge_query).encode('utf-8'), + headers={ + "Content-Type": "application/json", + "Accept": "application/json", + "Authorization": f"Basic {auth_str}" + } + ) + with urlopen(req, timeout=3) as response2: + if response2.status == 200: + data2 = json.loads(response2.read().decode('utf-8')) + results2 = data2.get("results", []) + if results2 and results2[0].get("data"): + edge_count = results2[0]["data"][0]["row"][0] + else: + edge_count = 0 + + graph_info["neo4j"] = { + "connected": True, + "node_count": node_count, + "edge_count": edge_count, + } - if graph_info["backend"] == "qdrant": - graph_info["backend"] = "both" - else: - graph_info["backend"] = "neo4j" - success = True - - driver.close() - except ImportError: - pass # neo4j package not installed + if graph_info["backend"] == "qdrant": + graph_info["backend"] = "both" + else: + graph_info["backend"] = "neo4j" + success = True except Exception as e: - logger.debug(f"Suppressed exception: {e}") # Neo4j not available + logger.debug(f"Neo4j HTTP check failed: {e}") # Neo4j not available return success, graph_info if success else None diff --git a/scripts/embedder.py b/scripts/embedder.py index 3ee631e8..04ea6830 100644 --- a/scripts/embedder.py +++ b/scripts/embedder.py @@ -76,6 +76,29 @@ _ARCTIC_V2_REGISTERED = False _ARCTIC_V2_REGISTER_LOCK = threading.Lock() +def _get_embedding_provider() -> str: + """Get embedding provider at call time (not import time) for test flexibility.""" + return os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() + + +class RemoteEmbeddingStub: + """Lightweight stub for remote embedding mode - no ONNX loaded. + + When EMBEDDING_PROVIDER=remote, we don't need to load the actual ONNX model + since all embedding calls go to the remote service. This stub provides the + model_name attribute needed by embed_batch() for routing. + """ + + def __init__(self, model_name: str): + self.model_name = model_name + + def embed(self, texts: List[str]): + """Stub - should not be called in remote mode.""" + raise RuntimeError( + "RemoteEmbeddingStub.embed() called but EMBEDDING_PROVIDER=remote. " + "Use embed_batch() from scripts.ingest.qdrant which routes to remote service." + ) + def _register_qwen3_model() -> None: """Register Qwen3 ONNX model with FastEmbed (one-time, thread-safe).""" @@ -149,13 +172,25 @@ def get_embedding_model(model_name: Optional[str] = None) -> Any: model_name: Model name override. If None, uses EMBEDDING_MODEL env var. Returns: - TextEmbedding instance (cached per model name). + TextEmbedding instance (cached per model name), or RemoteEmbeddingStub + when EMBEDDING_PROVIDER=remote (no ONNX loaded, saves ~3-4 GB RAM). """ - from fastembed import TextEmbedding - if model_name is None: model_name = os.environ.get("EMBEDDING_MODEL", DEFAULT_MODEL) + # Remote mode: return lightweight stub (no ONNX loaded) + # This saves ~3-4 GB RAM per indexer since embed_batch() routes to remote service + if _get_embedding_provider() == "remote": + cached = _EMBED_MODEL_CACHE.get(f"remote:{model_name}") + if cached is not None: + return cached + stub = RemoteEmbeddingStub(model_name) + _EMBED_MODEL_CACHE[f"remote:{model_name}"] = stub + logger.info(f"[embedder] Using remote embedding service for {model_name} (no local ONNX)") + return stub + + from fastembed import TextEmbedding + # Register Qwen3 if enabled and requested if QWEN3_ENABLED and "qwen3" in model_name.lower(): _register_qwen3_model() diff --git a/scripts/hybrid/embed.py b/scripts/hybrid/embed.py index aa60d42c..3e1c8ae4 100644 --- a/scripts/hybrid/embed.py +++ b/scripts/hybrid/embed.py @@ -32,11 +32,12 @@ # Embedder factory setup # --------------------------------------------------------------------------- try: - from scripts.embedder import get_embedding_model as _get_embedding_model + from scripts.embedder import get_embedding_model as _get_embedding_model, RemoteEmbeddingStub _EMBEDDER_FACTORY = True except ImportError: _EMBEDDER_FACTORY = False _get_embedding_model = None # type: ignore + RemoteEmbeddingStub = None # type: ignore # Always try to import TextEmbedding for backward compatibility with tests try: @@ -44,6 +45,14 @@ except ImportError: TextEmbedding = None # type: ignore +# Remote embedding support: use embed_batch() when EMBEDDING_PROVIDER=remote +try: + from scripts.ingest.qdrant import embed_batch as _embed_batch_remote + _REMOTE_EMBED_AVAILABLE = True +except ImportError: + _embed_batch_remote = None # type: ignore + _REMOTE_EMBED_AVAILABLE = False + # Type alias for embedding model (TextEmbedding or compatible) EmbeddingModel = Any if TextEmbedding is None else TextEmbedding @@ -231,24 +240,38 @@ def _embed_with_unified_cache( # Batch-embed all missing queries in one call # Use query_embed if available AND ASYMMETRIC_EMBEDDING=1 (e.g., Jina v3) if missing_queries: - # Select embedding function: query_embed for asymmetric models, else regular embed - if ASYMMETRIC_EMBEDDING and hasattr(model, "query_embed"): - embed_fn = model.query_embed - else: - embed_fn = model.embed + # Check if model is RemoteEmbeddingStub - route to remote service + is_remote_stub = RemoteEmbeddingStub is not None and isinstance(model, RemoteEmbeddingStub) - try: - vecs = list(embed_fn(missing_queries)) - # Cache all new embeddings - for q, vec in zip(missing_queries, vecs): - key = (str(model_name), str(q)) - cache.set(key, vec.tolist()) - except Exception: - # Fallback to one-by-one if batch fails - for q in missing_queries: - key = (str(model_name), str(q)) - vec = next(embed_fn([q])).tolist() - cache.set(key, vec) + if is_remote_stub and _REMOTE_EMBED_AVAILABLE and _embed_batch_remote is not None: + # Use remote embedding service via embed_batch() + try: + vecs = _embed_batch_remote(model, missing_queries) + for q, vec in zip(missing_queries, vecs): + key = (str(model_name), str(q)) + cache.set(key, vec if isinstance(vec, list) else vec.tolist()) + except Exception as e: + logger.warning(f"Remote embedding failed: {e}") + raise + else: + # Local embedding: select function based on asymmetric mode + if ASYMMETRIC_EMBEDDING and hasattr(model, "query_embed"): + embed_fn = model.query_embed + else: + embed_fn = model.embed + + try: + vecs = list(embed_fn(missing_queries)) + # Cache all new embeddings + for q, vec in zip(missing_queries, vecs): + key = (str(model_name), str(q)) + cache.set(key, vec.tolist()) + except Exception: + # Fallback to one-by-one if batch fails + for q in missing_queries: + key = (str(model_name), str(q)) + vec = next(embed_fn([q])).tolist() + cache.set(key, vec) # Return embeddings in original order from cache out: List[List[float]] = [] @@ -280,34 +303,53 @@ def _embed_with_legacy_cache( # Batch-embed all missing queries in one call # Use query_embed if available AND ASYMMETRIC_EMBEDDING=1 (e.g., Jina v3) if missing_queries: - if ASYMMETRIC_EMBEDDING and hasattr(model, "query_embed"): - embed_fn = model.query_embed - else: - embed_fn = model.embed + # Check if model is RemoteEmbeddingStub - route to remote service + is_remote_stub = RemoteEmbeddingStub is not None and isinstance(model, RemoteEmbeddingStub) - try: - # Embed all missing queries at once - vecs = list(embed_fn(missing_queries)) - with _EMBED_LOCK: - # Cache all new embeddings - for q, vec in zip(missing_queries, vecs): - key = (str(model_name), str(q)) - if key not in _EMBED_QUERY_CACHE: - _EMBED_QUERY_CACHE[key] = vec.tolist() - # Evict oldest entries if cache exceeds limit - while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: - _EMBED_QUERY_CACHE.popitem(last=False) - except Exception: - # Fallback to one-by-one if batch fails - for q in missing_queries: - key = (str(model_name), str(q)) - vec = next(embed_fn([q])).tolist() + if is_remote_stub and _REMOTE_EMBED_AVAILABLE and _embed_batch_remote is not None: + # Use remote embedding service via embed_batch() + try: + vecs = _embed_batch_remote(model, missing_queries) with _EMBED_LOCK: - if key not in _EMBED_QUERY_CACHE: - _EMBED_QUERY_CACHE[key] = vec - # Evict oldest entries if cache exceeds limit - while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: - _EMBED_QUERY_CACHE.popitem(last=False) + for q, vec in zip(missing_queries, vecs): + key = (str(model_name), str(q)) + if key not in _EMBED_QUERY_CACHE: + _EMBED_QUERY_CACHE[key] = vec if isinstance(vec, list) else vec.tolist() + while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: + _EMBED_QUERY_CACHE.popitem(last=False) + except Exception as e: + logger.warning(f"Remote embedding failed: {e}") + raise + else: + # Local embedding + if ASYMMETRIC_EMBEDDING and hasattr(model, "query_embed"): + embed_fn = model.query_embed + else: + embed_fn = model.embed + + try: + # Embed all missing queries at once + vecs = list(embed_fn(missing_queries)) + with _EMBED_LOCK: + # Cache all new embeddings + for q, vec in zip(missing_queries, vecs): + key = (str(model_name), str(q)) + if key not in _EMBED_QUERY_CACHE: + _EMBED_QUERY_CACHE[key] = vec.tolist() + # Evict oldest entries if cache exceeds limit + while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: + _EMBED_QUERY_CACHE.popitem(last=False) + except Exception: + # Fallback to one-by-one if batch fails + for q in missing_queries: + key = (str(model_name), str(q)) + vec = next(embed_fn([q])).tolist() + with _EMBED_LOCK: + if key not in _EMBED_QUERY_CACHE: + _EMBED_QUERY_CACHE[key] = vec + # Evict oldest entries if cache exceeds limit + while len(_EMBED_QUERY_CACHE) > MAX_EMBED_CACHE: + _EMBED_QUERY_CACHE.popitem(last=False) # Return embeddings in original order from cache (thread-safe read) out: List[List[float]] = [] diff --git a/scripts/ingest/pipeline.py b/scripts/ingest/pipeline.py index 0fe9e893..a99675e8 100644 --- a/scripts/ingest/pipeline.py +++ b/scripts/ingest/pipeline.py @@ -1508,6 +1508,7 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: Returns: (path, error_or_none, was_skipped_due_to_lock) """ + import gc per_file_repo = ( root_repo_for_cache if root_repo_for_cache is not None @@ -1532,6 +1533,9 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: return (file_path, None, True) except Exception as e: return (file_path, e, False) + finally: + # Free memory after each file to prevent accumulation + gc.collect() files_processed = 0 errors = [] diff --git a/scripts/ingest/qdrant.py b/scripts/ingest/qdrant.py index 73daf2f2..4ad4515d 100644 --- a/scripts/ingest/qdrant.py +++ b/scripts/ingest/qdrant.py @@ -9,6 +9,7 @@ import logging import os +import threading import time import xxhash @@ -17,6 +18,29 @@ logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# ONNX concurrency control - prevents memory explosion with parallel workers +# --------------------------------------------------------------------------- +_EMBED_MAX_CONCURRENT = max(1, int(os.environ.get("EMBED_MAX_CONCURRENT", "2") or 2)) +_EMBED_SEMAPHORE = threading.Semaphore(_EMBED_MAX_CONCURRENT) + +# Remote embedding service configuration - functions for runtime flexibility (tests can change env) +def _get_embedding_provider() -> str: + return os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() + +def _get_embedding_service_url() -> str: + return os.environ.get("EMBEDDING_SERVICE_URL", "http://embedding:8100") + +def _get_embedding_service_urls() -> list: + raw = os.environ.get("EMBEDDING_SERVICE_URLS", "").strip() + return [u.strip() for u in raw.split(",") if u.strip()] if raw else [] + +_EMBEDDING_SERVICE_TIMEOUT = int(os.environ.get("EMBEDDING_SERVICE_TIMEOUT", "60") or 60) + +# Client-side load balancing for multiple replicas (Compose mode) +_EMBED_REPLICA_INDEX = 0 +_EMBED_REPLICA_LOCK = threading.Lock() + from qdrant_client import QdrantClient, models from scripts.ingest.config import ( @@ -979,22 +1003,72 @@ def hash_id(text: str, path: str, start: int, end: int) -> int: return int(h[:16], 16) -def embed_batch(model, texts: List[str]) -> List[List[float]]: - """Embed a batch of texts using the embedding model. - - When ASYMMETRIC_EMBEDDING=1, uses passage_embed for documents (if available). - This enables asymmetric retrieval with models like Jina v3 that have - separate query/passage adapters. - """ - # Check for asymmetric embedding mode +def _embed_local(model, texts: List[str]) -> List[List[float]]: + """Local ONNX embedding with concurrency control.""" asymmetric = ( str(os.environ.get("ASYMMETRIC_EMBEDDING", "0")).strip().lower() in {"1", "true", "yes", "on"} ) - if asymmetric and hasattr(model, "passage_embed"): - # Use passage_embed for documents (asymmetric models like Jina v3) - return [vec.tolist() for vec in model.passage_embed(texts)] + # Semaphore prevents ONNX memory explosion with parallel workers + with _EMBED_SEMAPHORE: + if asymmetric and hasattr(model, "passage_embed"): + return [vec.tolist() for vec in model.passage_embed(texts)] + else: + return [vec.tolist() for vec in model.embed(texts)] + + +def _embed_remote(texts: List[str], model_name: str = "default") -> List[List[float]]: + """Remote embedding via HTTP service with client-side load balancing. + + If EMBEDDING_SERVICE_URLS is set (comma-separated), round-robins across replicas + for true parallel processing. Otherwise uses single EMBEDDING_SERVICE_URL. + """ + global _EMBED_REPLICA_INDEX + import requests + + # Get URLs at call time (not import time) for test flexibility + service_urls = _get_embedding_service_urls() + service_url = _get_embedding_service_url() + + # Client-side load balancing across replicas + if service_urls: + with _EMBED_REPLICA_LOCK: + url = service_urls[_EMBED_REPLICA_INDEX % len(service_urls)] + _EMBED_REPLICA_INDEX += 1 + else: + url = service_url + + try: + resp = requests.post( + f"{url}/embed", + json={"texts": texts, "model": model_name}, + timeout=_EMBEDDING_SERVICE_TIMEOUT, + ) + resp.raise_for_status() + return resp.json()["vectors"] + except Exception as e: + logger.error(f"Remote embedding failed ({url}): {e}") + raise + + +def embed_batch(model, texts: List[str]) -> List[List[float]]: + """Embed a batch of texts using configured provider. + + Providers (set via EMBEDDING_PROVIDER env var): + - local: Use local ONNX model with concurrency control (default) + - remote: Use remote embedding service at EMBEDDING_SERVICE_URL + + When ASYMMETRIC_EMBEDDING=1, uses passage_embed for documents (if available). + """ + if not texts: + return [] + + # Check provider at call time (not import time) for test flexibility + if _get_embedding_provider() == "remote": + # Get model name for remote service + model_name = getattr(model, "model_name", None) or os.environ.get("EMBEDDING_MODEL", "default") + return _embed_remote(texts, model_name) else: - # Standard symmetric embedding - return [vec.tolist() for vec in model.embed(texts)] + # Local with semaphore-controlled concurrency + return _embed_local(model, texts) diff --git a/scripts/mcp_memory_server.py b/scripts/mcp_memory_server.py index 61880669..6b01d804 100644 --- a/scripts/mcp_memory_server.py +++ b/scripts/mcp_memory_server.py @@ -429,7 +429,10 @@ def _ensure_collection(name: str): _return_qdrant_client(client) # Choose dense dimension based on config: probe (default) vs env-configured - if MEMORY_PROBE_EMBED_DIM: + # When EMBEDDING_PROVIDER=remote, skip local model loading and use env dimension or probe via remote + _embedding_provider = os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() + + if MEMORY_PROBE_EMBED_DIM and _embedding_provider != "remote": try: # Probe dimension without populating the shared model cache. # This preserves the "cache loads on first tool call" behavior and @@ -450,6 +453,22 @@ def _ensure_collection(name: str): dense_dim = int(os.environ.get("MEMORY_VECTOR_DIM") or os.environ.get("EMBED_DIM") or "768") except Exception: dense_dim = 768 + elif MEMORY_PROBE_EMBED_DIM and _embedding_provider == "remote": + # Remote mode: probe via remote embedding service instead of loading model locally + try: + import requests + _embed_url = os.environ.get("EMBEDDING_SERVICE_URL", "http://embedding:8100") + resp = requests.post(f"{_embed_url}/embed", json={"texts": ["probe"]}, timeout=30) + resp.raise_for_status() + vectors = resp.json().get("vectors", []) + dense_dim = len(vectors[0]) if vectors else int(os.environ.get("MEMORY_VECTOR_DIM") or os.environ.get("EMBED_DIM") or "768") + logger.info(f"Probed embedding dimension via remote service: {dense_dim}") + except Exception as e: + logger.warning(f"Remote embedding probe failed, using env dimension: {e}") + try: + dense_dim = int(os.environ.get("MEMORY_VECTOR_DIM") or os.environ.get("EMBED_DIM") or "768") + except Exception: + dense_dim = 768 else: dense_dim = int(MEMORY_VECTOR_DIM or 768) diff --git a/scripts/warm_start.py b/scripts/warm_start.py index c766ff46..9027d36b 100644 --- a/scripts/warm_start.py +++ b/scripts/warm_start.py @@ -101,9 +101,19 @@ async def warmup_reranker() -> float: async def warmup_embedding_model(model_name: str) -> float: """Warm up embedding model by loading and running dummy inference. + When EMBEDDING_PROVIDER=remote, checks the embedding service health instead + of loading a local ONNX model. + Returns: Latency in milliseconds """ + embedding_provider = os.environ.get("EMBEDDING_PROVIDER", "local").strip().lower() + + # Remote mode: check embedding service health instead of local model + if embedding_provider == "remote": + return await _warmup_remote_embedding() + + # Local mode: load and warm up local ONNX model try: start = time.perf_counter() model = await asyncio.to_thread(get_embedding_model, model_name) @@ -121,6 +131,51 @@ async def warmup_embedding_model(model_name: str) -> float: raise +async def _warmup_remote_embedding() -> float: + """Check remote embedding service health and warm it up with a test request. + + Returns: + Latency in milliseconds + """ + import urllib.request + import json + + service_url = os.environ.get("EMBEDDING_SERVICE_URL", "http://embedding:8100") + timeout = int(os.environ.get("EMBEDDING_SERVICE_TIMEOUT", "30")) + + try: + start = time.perf_counter() + + # First check health + health_url = f"{service_url.rstrip('/')}/health" + with urllib.request.urlopen(health_url, timeout=timeout) as resp: + if resp.status != 200: + raise RuntimeError(f"Embedding service unhealthy: {resp.status}") + health_data = json.loads(resp.read().decode()) + logger.info(f"Remote embedding service healthy: {health_data.get('model', 'unknown')}") + + # Warm up with a test embedding request + embed_url = f"{service_url.rstrip('/')}/embed" + test_payload = json.dumps({"texts": ["warmup probe text"]}).encode() + req = urllib.request.Request( + embed_url, + data=test_payload, + headers={"Content-Type": "application/json"}, + method="POST" + ) + with urllib.request.urlopen(req, timeout=timeout) as resp: + if resp.status != 200: + raise RuntimeError(f"Embedding service embed failed: {resp.status}") + + elapsed = (time.perf_counter() - start) * 1000 + logger.info(f"Remote embedding warmup: {elapsed:.1f}ms") + return elapsed + + except Exception as e: + logger.warning(f"Remote embedding warmup failed: {e}") + raise + + async def warmup_all_models() -> dict: """Orchestrate parallel warmup of embedding + reranker models. diff --git a/services/embedding_service/__init__.py b/services/embedding_service/__init__.py new file mode 100644 index 00000000..d6a3e920 --- /dev/null +++ b/services/embedding_service/__init__.py @@ -0,0 +1,2 @@ +# Embedding service package + diff --git a/tests/test_ingest_schema_mode.py b/tests/test_ingest_schema_mode.py index c766089b..593dce40 100644 --- a/tests/test_ingest_schema_mode.py +++ b/tests/test_ingest_schema_mode.py @@ -30,7 +30,7 @@ def get_collection(self, name): payload_schema=self.payload_schema, ) - def create_collection(self, collection_name, vectors_config, sparse_vectors_config=None, hnsw_config=None, quantization_config=None): + def create_collection(self, collection_name, vectors_config, sparse_vectors_config=None, hnsw_config=None, quantization_config=None, on_disk_payload=None): self.create_calls.append( { "collection_name": collection_name, @@ -38,6 +38,7 @@ def create_collection(self, collection_name, vectors_config, sparse_vectors_conf "sparse_vectors_config": sparse_vectors_config, "hnsw_config": hnsw_config, "quantization_config": quantization_config, + "on_disk_payload": on_disk_payload, } ) self.collection_exists = True