diff --git a/.env.example b/.env.example index 562d84b6..30ebc3eb 100644 --- a/.env.example +++ b/.env.example @@ -161,6 +161,7 @@ SEMANTIC_EXPANSION_CACHE_TTL=3600 # Codebase state backend (for .codebase/state.json, .codebase/cache.json, symbols) # Set CODEBASE_STATE_BACKEND=redis to use Redis (recommended for K8s) +# When switching backends, existing state is automatically migrated on startup. # CODEBASE_STATE_BACKEND=file # CODEBASE_STATE_REDIS_ENABLED=0 # CODEBASE_STATE_REDIS_URL=redis://redis:6379/0 @@ -169,6 +170,7 @@ SEMANTIC_EXPANSION_CACHE_TTL=3600 # CODEBASE_STATE_REDIS_LOCK_WAIT_MS=2000 # CODEBASE_STATE_REDIS_SOCKET_TIMEOUT=2 # CODEBASE_STATE_REDIS_CONNECT_TIMEOUT=2 +# CODEBASE_STATE_SKIP_MIGRATION=0 # Set to 1 to disable auto-migration on backend switch # Query Optimization (adaptive HNSW_EF tuning for 2x faster simple queries) QUERY_OPTIMIZER_ADAPTIVE=1 diff --git a/.gitignore b/.gitignore index 9cc39e63..ffcd5f4e 100644 --- a/.gitignore +++ b/.gitignore @@ -65,3 +65,7 @@ ctx_config.json /deploy/eks-cdk /deploy/eks-cdk-PATHFUL .env +.contextstream/config.json +.contextstream/ignore +.cursorrules +GEMINI.md diff --git a/.qdrantignore b/.qdrantignore index dde3a243..e0b4b3c2 100644 --- a/.qdrantignore +++ b/.qdrantignore @@ -4,3 +4,34 @@ # Dev workspace contains uploaded client workspaces - indexed separately /dev-workspace + +# CDK build artifacts (large, auto-generated) +/deploy/eks-cdk*/cdk.out/** +/deploy/eks-cdk*/**/*.js +/deploy/eks-cdk*/**/*.d.ts +/deploy/eks-cdk*/node_modules/** + +# Node modules +/node_modules +**/node_modules + +# Python virtual environments +/.venv* +/venv + +# Model binaries +/models +*.onnx +*.bin +*.safetensors + +# Test fixtures +/test-repos +/.selftest_repo + +# IDE/Editor +/.idea +/.vscode + +# ContextStream local cache +/.contextstream diff --git a/docker-compose.yml b/docker-compose.yml index 0f14cda6..e3513da5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,24 @@ version: '3.8' services: + # Redis cache/state backend + redis: + image: redis:7-alpine + container_name: redis-cache + ports: + - "6379:6379" + command: ["redis-server", "--appendonly", "yes", "--maxmemory", "256mb", "--maxmemory-policy", "allkeys-lru"] + volumes: + - redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + restart: unless-stopped + networks: + - dev-remote-network + # Qdrant vector database - same as base compose qdrant: image: qdrant/qdrant:latest @@ -102,6 +120,7 @@ services: command: ["sh", "-c", "mkdir -p /tmp/huggingface/hub /tmp/huggingface/transformers /tmp/huggingface/fastembed && exec python /app/scripts/mcp_indexer_server.py"] depends_on: - qdrant + - redis env_file: - .env environment: @@ -172,6 +191,10 @@ services: - RERANKER_WEIGHTS_DIR=/tmp/rerank_weights - RERANK_EVENTS_DIR=/tmp/rerank_events - RERANK_EVENTS_ENABLED=${RERANK_EVENTS_ENABLED:-1} + # Redis state backend + - CODEBASE_STATE_BACKEND=${CODEBASE_STATE_BACKEND:-file} + - CODEBASE_STATE_REDIS_URL=${CODEBASE_STATE_REDIS_URL:-redis://redis:6379/0} + - CODEBASE_STATE_REDIS_PREFIX=${CODEBASE_STATE_REDIS_PREFIX:-context-engine:codebase} ports: - "${FASTMCP_INDEXER_PORT:-8001}:8001" - "18001:18001" @@ -193,7 +216,7 @@ services: command: ["sh", "-c", "mkdir -p /tmp/huggingface/hub /tmp/huggingface/transformers /tmp/huggingface/fastembed && exec python /app/scripts/learning_reranker_worker.py --daemon"] depends_on: - qdrant - - mcp_indexer + - redis env_file: - .env environment: @@ -316,6 +339,7 @@ services: command: ["sh", "-c", "mkdir -p /tmp/huggingface/hub /tmp/huggingface/transformers /tmp/huggingface/fastembed && exec python /app/scripts/mcp_indexer_server.py"] depends_on: - qdrant + - redis env_file: - .env environment: @@ -378,6 +402,10 @@ services: - RERANKER_WEIGHTS_DIR=/tmp/rerank_weights - RERANK_EVENTS_DIR=/tmp/rerank_events - RERANK_EVENTS_ENABLED=${RERANK_EVENTS_ENABLED:-1} + # Redis state backend + - CODEBASE_STATE_BACKEND=${CODEBASE_STATE_BACKEND:-file} + - CODEBASE_STATE_REDIS_URL=${CODEBASE_STATE_REDIS_URL:-redis://redis:6379/0} + - CODEBASE_STATE_REDIS_PREFIX=${CODEBASE_STATE_REDIS_PREFIX:-context-engine:codebase} ports: - "${FASTMCP_INDEXER_HTTP_PORT:-8003}:8001" - "${FASTMCP_INDEXER_HTTP_HEALTH_PORT:-18003}:18001" @@ -415,6 +443,7 @@ services: user: "1000:1000" depends_on: - qdrant + - redis env_file: - .env environment: @@ -459,6 +488,10 @@ services: - PSEUDO_DEFER_TO_WORKER=${PSEUDO_DEFER_TO_WORKER:-1} # Parallel indexing - number of worker threads (default: 4, use -1 for CPU count) - INDEX_WORKERS=${INDEX_WORKERS:-4} + # Redis state backend + - CODEBASE_STATE_BACKEND=${CODEBASE_STATE_BACKEND:-file} + - CODEBASE_STATE_REDIS_URL=${CODEBASE_STATE_REDIS_URL:-redis://redis:6379/0} + - CODEBASE_STATE_REDIS_PREFIX=${CODEBASE_STATE_REDIS_PREFIX:-context-engine:codebase} volumes: - workspace_pvc:/work:rw - codebase_pvc:/work/.codebase:rw @@ -477,6 +510,7 @@ services: user: "1000:1000" depends_on: - qdrant + - redis env_file: - .env environment: @@ -524,6 +558,10 @@ services: - NEO4J_GRAPH=${NEO4J_GRAPH:-} # Defer pseudo-tag generation - watcher runs backfill worker thread - PSEUDO_DEFER_TO_WORKER=${PSEUDO_DEFER_TO_WORKER:-1} + # Redis state backend + - CODEBASE_STATE_BACKEND=${CODEBASE_STATE_BACKEND:-file} + - CODEBASE_STATE_REDIS_URL=${CODEBASE_STATE_REDIS_URL:-redis://redis:6379/0} + - CODEBASE_STATE_REDIS_PREFIX=${CODEBASE_STATE_REDIS_PREFIX:-context-engine:codebase} volumes: - workspace_pvc:/work:rw - codebase_pvc:/work/.codebase:rw @@ -702,6 +740,10 @@ volumes: rerank_events: driver: local + # Redis data persistence + redis_data: + driver: local + # Custom network for service discovery networks: dev-remote-network: diff --git a/scripts/admin_ui.py b/scripts/admin_ui.py index 22253198..f6ecefa4 100644 --- a/scripts/admin_ui.py +++ b/scripts/admin_ui.py @@ -48,6 +48,8 @@ def render_admin_acl( users: Any, collections: Any, grants: Any, + api_keys: Optional[Any] = None, + neo4j_status: Optional[Dict[str, Any]] = None, deletion_enabled: bool = False, work_dir: str = "/work", refresh_ms: int = 5000, @@ -62,6 +64,8 @@ def render_admin_acl( "users": users, "collections": collections, "grants": grants, + "api_keys": api_keys or [], + "neo4j_status": neo4j_status or {}, "deletion_enabled": bool(deletion_enabled), "work_dir": work_dir, "staging_enabled": bool(is_staging_enabled() if callable(is_staging_enabled) else False), diff --git a/scripts/auth_backend.py b/scripts/auth_backend.py index 41dec18c..ab071cb2 100644 --- a/scripts/auth_backend.py +++ b/scripts/auth_backend.py @@ -120,6 +120,9 @@ def _ensure_db() -> None: conn.execute( "CREATE TABLE IF NOT EXISTS collection_acl (collection_id TEXT NOT NULL, user_id TEXT NOT NULL, permission TEXT NOT NULL, created_at INTEGER NOT NULL, PRIMARY KEY (collection_id, user_id))" ) + conn.execute( + "CREATE TABLE IF NOT EXISTS api_keys (id TEXT PRIMARY KEY, name TEXT NOT NULL, key_hash TEXT NOT NULL, key_prefix TEXT NOT NULL, key_suffix TEXT NOT NULL, scope TEXT NOT NULL DEFAULT 'read', created_at INTEGER NOT NULL, expires_at INTEGER, last_used_at INTEGER, is_revoked INTEGER NOT NULL DEFAULT 0)" + ) try: cur = conn.cursor() cur.execute("PRAGMA table_info(users)") @@ -295,22 +298,47 @@ def create_session_for_token( ) -> Dict[str, Any]: if not AUTH_ENABLED: raise AuthDisabledError("Auth not enabled") - if AUTH_SHARED_TOKEN: - # When a shared token is configured, require it for all token-based sessions. - if not token or token != AUTH_SHARED_TOKEN: - raise AuthInvalidToken("Invalid auth token") + + token = (token or "").strip() + api_key_info: Optional[Dict[str, Any]] = None + + # First, check if token matches the shared token + if AUTH_SHARED_TOKEN and token == AUTH_SHARED_TOKEN: + pass # Valid shared token + elif token: + # Try validating as an API key + try: + api_key_info = validate_api_key(token) + except Exception: + api_key_info = None + + if not api_key_info: + # Not a valid API key, check shared token requirement + if AUTH_SHARED_TOKEN: + raise AuthInvalidToken("Invalid auth token") + elif not ALLOW_OPEN_TOKEN_LOGIN: + raise AuthInvalidToken( + "Token-based login disabled (no shared token configured; set CTXCE_AUTH_SHARED_TOKEN " + "or CTXCE_AUTH_ALLOW_OPEN_TOKEN_LOGIN=1 to enable)" + ) else: - # Harden default behavior: when auth is enabled but no shared token is configured, - # disable token-based login unless explicitly allowed via env. - if not ALLOW_OPEN_TOKEN_LOGIN: + # No token provided + if AUTH_SHARED_TOKEN: + raise AuthInvalidToken("Invalid auth token") + elif not ALLOW_OPEN_TOKEN_LOGIN: raise AuthInvalidToken( "Token-based login disabled (no shared token configured; set CTXCE_AUTH_SHARED_TOKEN " "or CTXCE_AUTH_ALLOW_OPEN_TOKEN_LOGIN=1 to enable)" ) + user_id = client or "ctxce" meta: Dict[str, Any] = {} if workspace: meta["workspace"] = workspace + if api_key_info: + meta["api_key_id"] = api_key_info["id"] + meta["api_key_name"] = api_key_info["name"] + meta["api_key_scope"] = api_key_info["scope"] return create_session(user_id=user_id, metadata=meta) @@ -668,3 +696,173 @@ def list_collection_acl() -> List[Dict[str, Any]]: } ) return out + + +# ============================================================================= +# API Keys Management +# ============================================================================= + +def _hash_api_key(key: str) -> str: + """Hash an API key for storage using SHA256.""" + return hashlib.sha256(key.encode("utf-8")).hexdigest() + + +def create_api_key( + name: str, + scope: str = "read", + expires_in: Optional[str] = None, +) -> Dict[str, Any]: + """Create a new API key. + + Args: + name: Human-readable name for the key + scope: Permission scope - 'read', 'write', or 'admin' + expires_in: Expiration period - 'never', '7d', '30d', '90d', '1y' + + Returns: + Dict with key details including the raw key (only shown once) + """ + if not AUTH_ENABLED: + raise AuthDisabledError("Auth not enabled") + _ensure_db() + + name = (name or "").strip() + if not name: + raise ValueError("Key name is required") + + scope = (scope or "read").strip().lower() + if scope not in {"read", "write", "admin"}: + scope = "read" + + # Generate a secure random key + key_id = uuid.uuid4().hex + raw_key = f"ctxce_{uuid.uuid4().hex}{uuid.uuid4().hex[:16]}" + key_hash = _hash_api_key(raw_key) + key_prefix = raw_key[:10] + key_suffix = raw_key[-4:] + + now_ts = int(datetime.now().timestamp()) + + # Calculate expiration + expires_ts: Optional[int] = None + if expires_in and expires_in != "never": + days_map = {"7d": 7, "30d": 30, "90d": 90, "1y": 365} + days = days_map.get(expires_in, 0) + if days > 0: + expires_ts = now_ts + (days * 24 * 60 * 60) + + with _db_connection() as conn: + with conn: + conn.execute( + "INSERT INTO api_keys (id, name, key_hash, key_prefix, key_suffix, scope, created_at, expires_at, is_revoked) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0)", + (key_id, name, key_hash, key_prefix, key_suffix, scope, now_ts, expires_ts), + ) + + return { + "id": key_id, + "name": name, + "key": raw_key, # Only returned on creation + "key_prefix": key_prefix, + "key_suffix": key_suffix, + "scope": scope, + "created_at": now_ts, + "expires_at": expires_ts, + } + + +def list_api_keys() -> List[Dict[str, Any]]: + """List all non-revoked API keys (without the actual key values).""" + if not AUTH_ENABLED: + raise AuthDisabledError("Auth not enabled") + _ensure_db() + + with _db_connection() as conn: + cur = conn.cursor() + cur.execute( + "SELECT id, name, key_prefix, key_suffix, scope, created_at, expires_at, last_used_at " + "FROM api_keys WHERE is_revoked = 0 ORDER BY created_at DESC" + ) + rows = cur.fetchall() or [] + + out: List[Dict[str, Any]] = [] + for r in rows: + created_dt = datetime.fromtimestamp(r[5]).strftime("%Y-%m-%d %H:%M") if r[5] else None + expires_dt = datetime.fromtimestamp(r[6]).strftime("%Y-%m-%d %H:%M") if r[6] else None + last_used_dt = datetime.fromtimestamp(r[7]).strftime("%Y-%m-%d %H:%M") if r[7] else None + out.append({ + "id": r[0], + "name": r[1], + "key_prefix": r[2], + "key_suffix": r[3], + "scope": r[4], + "created_at": created_dt, + "expires_at": expires_dt, + "last_used_at": last_used_dt, + }) + return out + + +def revoke_api_key(key_id: str) -> bool: + """Revoke an API key by ID.""" + if not AUTH_ENABLED: + raise AuthDisabledError("Auth not enabled") + _ensure_db() + + key_id = (key_id or "").strip() + if not key_id: + raise ValueError("Key ID is required") + + with _db_connection() as conn: + with conn: + cur = conn.cursor() + cur.execute("UPDATE api_keys SET is_revoked = 1 WHERE id = ?", (key_id,)) + return cur.rowcount > 0 + + +def validate_api_key(raw_key: str) -> Optional[Dict[str, Any]]: + """Validate an API key and return its details if valid. + + Also updates last_used_at timestamp on successful validation. + """ + if not AUTH_ENABLED: + raise AuthDisabledError("Auth not enabled") + _ensure_db() + + raw_key = (raw_key or "").strip() + if not raw_key: + return None + + key_hash = _hash_api_key(raw_key) + now_ts = int(datetime.now().timestamp()) + + with _db_connection() as conn: + cur = conn.cursor() + cur.execute( + "SELECT id, name, scope, expires_at FROM api_keys " + "WHERE key_hash = ? AND is_revoked = 0", + (key_hash,), + ) + row = cur.fetchone() + + if not row: + return None + + key_id, name, scope, expires_at = row + + # Check expiration + if expires_at and expires_at < now_ts: + return None + + # Update last_used_at + with conn: + conn.execute( + "UPDATE api_keys SET last_used_at = ? WHERE id = ?", + (now_ts, key_id), + ) + + return { + "id": key_id, + "name": name, + "scope": scope, + } diff --git a/scripts/ctx_cli/commands/reset.py b/scripts/ctx_cli/commands/reset.py index 3e8642cf..c09e3483 100644 --- a/scripts/ctx_cli/commands/reset.py +++ b/scripts/ctx_cli/commands/reset.py @@ -147,10 +147,11 @@ def _download_file(url: str, dest: Path, description: str) -> bool: def reset( - mode: str = "dual", + mode: str = "mcp", skip_build: bool = False, skip_model: bool = False, skip_tokenizer: bool = False, + db_reset: bool = False, model_url: Optional[str] = None, model_path: Optional[str] = None, tokenizer_url: Optional[str] = None, @@ -163,14 +164,14 @@ def reset( recreates indexes, and starts services in the specified mode. Modes: - dual: Both SSE and HTTP MCPs (default, most comprehensive) - mcp: HTTP MCPs only (streamable, Codex compatible) - sse: SSE MCPs only (legacy) + (default): HTTP MCPs only (streamable, Codex compatible) + --dual: Both SSE and HTTP MCPs (most comprehensive) + --sse: SSE MCPs only (legacy) Examples: - ctx reset # Full reset with dual mode - ctx reset --mcp # HTTP MCPs only - ctx reset --sse # SSE MCPs only + ctx reset # Full reset with HTTP MCPs (default) + ctx reset --dual # Both SSE and HTTP MCPs + ctx reset --sse # SSE MCPs only (legacy) ctx reset --skip-model # Skip model download ctx reset --skip-build # Skip container rebuild """ @@ -193,22 +194,35 @@ def reset( if neo4j_enabled: compose_cmd.extend(["-f", "docker-compose.yml", "-f", "docker-compose.neo4j.yml"]) + # Check if Redis is enabled (CODEBASE_STATE_BACKEND=redis) + redis_enabled = os.environ.get("CODEBASE_STATE_BACKEND", "").strip().lower() == "redis" + + # Check if learning reranker is enabled + rerank_learning_enabled = os.environ.get("RERANK_LEARNING", "1").strip().lower() in ("1", "true", "yes") + # Determine which containers to build/start based on mode - if mode == "mcp": - # HTTP MCPs only (Codex compatible) + upload_service for remote sync - build_containers = ["indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] - start_containers = ["mcp_http", "mcp_indexer_http", "watcher", "upload_service"] - mode_desc = "HTTP MCPs only (streamable)" - elif mode == "sse": - # SSE MCPs only (legacy) + # Default is HTTP-only; SSE only starts when explicitly requested with --sse + if mode == "sse": + # SSE MCPs only (legacy, must be explicitly requested) build_containers = ["indexer", "mcp", "mcp_indexer", "watcher"] start_containers = ["mcp", "mcp_indexer", "watcher"] mode_desc = "SSE MCPs only (legacy)" - else: - # Dual mode (default) + elif mode == "dual": + # Dual mode (both SSE and HTTP) build_containers = ["indexer", "mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] start_containers = ["mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] mode_desc = "Dual mode (SSE + HTTP)" + else: + # HTTP MCPs only (default, Codex compatible) + upload_service for remote sync + build_containers = ["indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + start_containers = ["mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + mode_desc = "HTTP MCPs (streamable)" + + # Add learning_worker if rerank learning is enabled + if rerank_learning_enabled: + build_containers.append("learning_worker") + start_containers.append("learning_worker") + mode_desc += " + Learning Reranker" # Add llamacpp container if needed if llamacpp_needed: @@ -223,11 +237,16 @@ def reset( start_containers.insert(0, "neo4j") # Start neo4j first (other services depend on it) mode_desc += " + Neo4j" + # Add Redis indicator to mode description (Redis is a dependency, not a built container) + if redis_enabled: + mode_desc += " + Redis" + _print_panel( f"[cyan]Mode:[/cyan] {mode_desc}\n" f"[cyan]Skip Build:[/cyan] {skip_build}\n" f"[cyan]Skip Model:[/cyan] {skip_model}\n" - f"[cyan]Skip Tokenizer:[/cyan] {skip_tokenizer}", + f"[cyan]Skip Tokenizer:[/cyan] {skip_tokenizer}\n" + f"[cyan]DB Reset:[/cyan] {db_reset}", title="Development Environment Reset", border_style="yellow" ) @@ -236,11 +255,17 @@ def reset( step = 0 try: - # Step 1: Stop all services and remove volumes + # Step 1: Stop all services (and optionally reset database volumes) step += 1 _print(f"\n[bold][{step}/{steps_total}] Stopping services...[/bold]") - _run_cmd(compose_cmd + ["down", "-v", "--remove-orphans"], "Stopping all containers", check=False) - _print("[green]✓[/green] Services stopped") + if db_reset: + # Full reset including database volumes (qdrant, redis, neo4j) + _run_cmd(compose_cmd + ["down", "-v", "--remove-orphans"], "Stopping all containers and removing volumes", check=False) + _print("[green]✓[/green] Services stopped and database volumes removed") + else: + # Stop services but preserve database volumes + _run_cmd(compose_cmd + ["down", "--remove-orphans"], "Stopping all containers", check=False) + _print("[green]✓[/green] Services stopped (database volumes preserved)") # Step 2: Build containers (unless skipped) step += 1 @@ -252,9 +277,11 @@ def reset( else: _print(f"\n[bold][{step}/{steps_total}] Skipping container build[/bold]") - # Step 3: Start Qdrant (and Neo4j if enabled) and wait + # Step 3: Start Qdrant, Redis (if enabled), and Neo4j (if enabled) and wait step += 1 db_services = ["qdrant"] + if redis_enabled: + db_services.append("redis") if neo4j_enabled: db_services.append("neo4j") _print(f"\n[bold][{step}/{steps_total}] Starting {', '.join(db_services)}...[/bold]") @@ -413,25 +440,31 @@ def register_command(subparsers): "This is equivalent to the Makefile's reset-dev targets.", epilog=""" Modes: - dual (default) Both SSE and HTTP MCPs (most comprehensive) - mcp HTTP MCPs only (streamable, Codex compatible) - sse SSE MCPs only (legacy) + (default) HTTP MCPs only (streamable, Codex compatible) + --dual Both SSE and HTTP MCPs (most comprehensive) + --sse SSE MCPs only (legacy) Examples: - ctx reset # Full reset with dual mode - ctx reset --mcp # HTTP MCPs only (streamable) + ctx reset # Full reset with HTTP MCPs (default) + ctx reset --dual # Both SSE and HTTP MCPs ctx reset --sse # SSE MCPs only (legacy) + ctx reset --db-reset # Reset database volumes (Qdrant, Redis, Neo4j) ctx reset --skip-model # Skip llama model download ctx reset --skip-build # Skip container rebuild (faster) """ ) - # Mode selection (mutually exclusive) + # Mode selection (mutually exclusive among modes) mode_group = parser.add_mutually_exclusive_group() mode_group.add_argument( "--mcp", action="store_true", - help="HTTP MCPs only (streamable, Codex compatible)" + help="HTTP MCPs only (streamable, Codex compatible) - this is the default" + ) + mode_group.add_argument( + "--dual", + action="store_true", + help="Both SSE and HTTP MCPs (most comprehensive)" ) mode_group.add_argument( "--sse", @@ -439,6 +472,13 @@ def register_command(subparsers): help="SSE MCPs only (legacy)" ) + # Database reset option (can be combined with any mode) + parser.add_argument( + "--db-reset", + action="store_true", + help="Reset database volumes (Qdrant, Redis, Neo4j)" + ) + # Skip options parser.add_argument( "--skip-build", @@ -476,19 +516,21 @@ def register_command(subparsers): def run_reset(args): """Wrapper to call reset function with argparse args.""" - # Determine mode - if args.mcp: - mode = "mcp" + # Determine mode (default is HTTP-only, no SSE) + if args.dual: + mode = "dual" elif args.sse: mode = "sse" else: - mode = "dual" + # --mcp or no flag = HTTP MCPs only (default) + mode = "mcp" return reset( mode=mode, skip_build=args.skip_build, skip_model=args.skip_model, skip_tokenizer=args.skip_tokenizer, + db_reset=args.db_reset, model_url=args.model_url, model_path=args.model_path, tokenizer_url=args.tokenizer_url, diff --git a/scripts/ctx_cli/commands/status.py b/scripts/ctx_cli/commands/status.py index c2b76205..dd22dc59 100644 --- a/scripts/ctx_cli/commands/status.py +++ b/scripts/ctx_cli/commands/status.py @@ -576,7 +576,12 @@ def print_status_table( points_count = qdrant_status_info.get("count", "Unknown") if isinstance(points_count, int): points_count = f"{points_count:,}" - last_indexed = format_timestamp(qdrant_status_info.get("last_indexed")) + # qdrant_status returns last_ingested_at as {"unix": int, "iso": str} + last_ingested = qdrant_status_info.get("last_ingested_at") + if isinstance(last_ingested, dict): + last_indexed = format_timestamp(last_ingested.get("iso")) + else: + last_indexed = format_timestamp(qdrant_status_info.get("last_indexed")) elif collection_info: collection_name = collection_info.get("collection", "Unknown") points_count = collection_info.get("points_count", collection_info.get("count", "Unknown")) @@ -640,16 +645,27 @@ def print_status_table( # Model warmup status if warmup_info: - embedding_ready = warmup_info.get("embedding_ready", False) - reranker_ready = warmup_info.get("reranker_ready", False) - if embedding_ready and reranker_ready: + # warmup_info returns status: "warm"|"warming"|"cold"|"failed" + # and embedding_ms/reranker_ms when available + warmup_status = warmup_info.get("status", "cold") + embedding_ready = warmup_info.get("embedding_ready", warmup_info.get("embedding_ms") is not None) + reranker_ready = warmup_info.get("reranker_ready", warmup_info.get("reranker_ms") is not None) + if warmup_status == "warm" or (embedding_ready and reranker_ready): models_text = Text() models_text.append("● ", style="green") models_text.append("Ready", style="green") - else: + elif warmup_status == "warming": models_text = Text() models_text.append("◐ ", style="yellow") models_text.append("Loading", style="yellow") + elif warmup_status == "failed": + models_text = Text() + models_text.append("✗ ", style="red") + models_text.append("Failed", style="red") + else: + models_text = Text() + models_text.append("○ ", style="dim") + models_text.append("Cold", style="dim") table.add_row("Models", models_text) # Indexing progress @@ -714,10 +730,22 @@ def print_status_table( print(f"│ Last index: {last_indexed}" + " " * (46 - len(f"Last index: {last_indexed}") - 2) + "│") if warmup_info: - embedding_ready = warmup_info.get("embedding_ready", False) - reranker_ready = warmup_info.get("reranker_ready", False) - models_symbol = "✓" if (embedding_ready and reranker_ready) else "⚠" - models_text = "Ready" if (embedding_ready and reranker_ready) else "Loading" + # warmup_info returns status: "warm"|"warming"|"cold"|"failed" + warmup_status = warmup_info.get("status", "cold") + embedding_ready = warmup_info.get("embedding_ready", warmup_info.get("embedding_ms") is not None) + reranker_ready = warmup_info.get("reranker_ready", warmup_info.get("reranker_ms") is not None) + if warmup_status == "warm" or (embedding_ready and reranker_ready): + models_symbol = "✓" + models_text = "Ready" + elif warmup_status == "warming": + models_symbol = "⚠" + models_text = "Loading" + elif warmup_status == "failed": + models_symbol = "✗" + models_text = "Failed" + else: + models_symbol = "○" + models_text = "Cold" print(f"│ Models: {models_symbol} {models_text}" + " " * (46 - len(f"Models: {models_symbol} {models_text}") - 2) + "│") if workspace_info: @@ -766,9 +794,11 @@ def print_status_table( print("│ " + "Model".ljust(25) + " Status".ljust(23) + "│") print("├" + "─" * 50 + "┤") + # Derive ready status from timing values when explicit booleans not present + warmup_status = warmup_info.get("status", "cold") models = [ - ("Embedding", warmup_info.get("embedding_ready", False)), - ("Reranker", warmup_info.get("reranker_ready", False)), + ("Embedding", warmup_info.get("embedding_ready", warmup_info.get("embedding_ms") is not None)), + ("Reranker", warmup_info.get("reranker_ready", warmup_info.get("reranker_ms") is not None)), ("Decoder", warmup_info.get("decoder_ready", False)), ] diff --git a/scripts/indexing_admin.py b/scripts/indexing_admin.py index 1e8abf5f..500b33d4 100644 --- a/scripts/indexing_admin.py +++ b/scripts/indexing_admin.py @@ -91,6 +91,51 @@ ) +def get_neo4j_status() -> Dict[str, Any]: + """ + Get Neo4j plugin status. + + Returns: + Dict with: enabled, healthy, version, checks, error + """ + neo4j_enabled = os.environ.get("NEO4J_GRAPH", "").strip().lower() in ("1", "true", "yes", "on") + if not neo4j_enabled: + return {"enabled": False, "healthy": None, "version": None, "checks": {}} + + try: + from plugins.neo4j_graph.plugin import register_plugin + manifest = register_plugin() + if manifest.health_check: + health = manifest.health_check() + return { + "enabled": True, + "healthy": health.get("healthy", False), + "version": health.get("version") or manifest.version, + "checks": health.get("checks", {}), + } + else: + return { + "enabled": True, + "healthy": False, + "version": manifest.version, + "checks": {"error": "Health check not available"}, + } + except ImportError as e: + return { + "enabled": True, + "healthy": False, + "version": None, + "checks": {"import_error": str(e)}, + } + except Exception as e: + return { + "enabled": True, + "healthy": False, + "version": None, + "checks": {"error": str(e)}, + } + + def _staging_enabled() -> bool: return bool(is_staging_enabled() if callable(is_staging_enabled) else False) @@ -154,37 +199,98 @@ def _copy_repo_state_for_clone( logger.warning("[staging] failed to retarget cache.json for %s: %s", clone_repo_name, exc) +# Cache with TTL support _COLLECTION_SCHEMA_CACHE: Dict[str, Dict[str, Any]] = {} +_COLLECTION_SCHEMA_CACHE_TS: Dict[str, float] = {} # Track cache timestamps +_COLLECTION_SCHEMA_CACHE_TTL = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300")) # 5 min default _SNAPSHOT_REFRESHED: Set[str] = set() _MAPPING_INDEX_CACHE: Dict[str, Any] = {"ts": 0.0, "work_dir": "", "value": {}} +# Shared Qdrant client pool for reduced connection overhead +_QDRANT_CLIENT_POOL: Dict[str, "QdrantClient"] = {} +_QDRANT_CLIENT_POOL_LOCK = __import__("threading").Lock() -def _probe_collection_schema(collection: str) -> Optional[Dict[str, Any]]: - if not collection or QdrantClient is None: - return None - cached = _COLLECTION_SCHEMA_CACHE.get(collection) - if cached: - return cached - qdrant_url = os.environ.get("QDRANT_URL", "http://qdrant:6333") - api_key = os.environ.get("QDRANT_API_KEY") or None +def _get_shared_qdrant_client(url: Optional[str] = None, api_key: Optional[str] = None) -> Optional["QdrantClient"]: + """Get or create a shared Qdrant client from the pool.""" if QdrantClient is None: - return + return None + + qdrant_url = url or os.environ.get("QDRANT_URL", "http://qdrant:6333") + qdrant_key = api_key or os.environ.get("QDRANT_API_KEY") or None + pool_key = f"{qdrant_url}|{qdrant_key or ''}" + with _QDRANT_CLIENT_POOL_LOCK: + if pool_key in _QDRANT_CLIENT_POOL: + return _QDRANT_CLIENT_POOL[pool_key] + + try: + client = QdrantClient( + url=qdrant_url, + api_key=qdrant_key, + timeout=float(os.environ.get("QDRANT_TIMEOUT", "60")), + ) + _QDRANT_CLIENT_POOL[pool_key] = client + return client + except Exception as e: + logger.debug(f"Failed to create shared Qdrant client: {e}") + return None + + +def _is_schema_cache_valid(collection: str) -> bool: + """Check if cached schema is still valid (within TTL).""" + if collection not in _COLLECTION_SCHEMA_CACHE: + return False + cache_ts = _COLLECTION_SCHEMA_CACHE_TS.get(collection, 0.0) + return (time.time() - cache_ts) < _COLLECTION_SCHEMA_CACHE_TTL + + +def _invalidate_schema_cache(collection: Optional[str] = None) -> None: + """Invalidate schema cache for a collection or all collections.""" + if collection: + _COLLECTION_SCHEMA_CACHE.pop(collection, None) + _COLLECTION_SCHEMA_CACHE_TS.pop(collection, None) + else: + _COLLECTION_SCHEMA_CACHE.clear() + _COLLECTION_SCHEMA_CACHE_TS.clear() + + +def list_qdrant_collections() -> List[Dict[str, Any]]: + """List all collections directly from Qdrant (no auth required). + + Returns a list of dicts with 'qdrant_collection' key for compatibility + with build_admin_collections_view. + """ + client = _get_shared_qdrant_client() + if client is None: + return [] try: - client = QdrantClient(url=qdrant_url, api_key=api_key) + collections_response = client.get_collections() + collections = getattr(collections_response, "collections", []) or [] + return [{"qdrant_collection": c.name} for c in collections if hasattr(c, "name")] except Exception as e: - logger.debug(f"Failed to connect to Qdrant for schema probe: {e}") + logger.debug(f"Failed to list Qdrant collections: {e}") + return [] + + +def _probe_collection_schema(collection: str, force_refresh: bool = False) -> Optional[Dict[str, Any]]: + """Probe collection schema with TTL-aware caching and shared client pool.""" + if not collection or QdrantClient is None: + return None + + # Check TTL-aware cache first (unless force refresh) + if not force_refresh and _is_schema_cache_valid(collection): + return _COLLECTION_SCHEMA_CACHE.get(collection) + + # Use shared client from pool + client = _get_shared_qdrant_client() + if client is None: return None try: info = client.get_collection(collection_name=collection) except Exception as e: logger.debug(f"Failed to get collection info for '{collection}': {e}") - try: - client.close() - except Exception as close_e: - logger.debug(f"Suppressed exception during close: {close_e}") return None try: @@ -231,16 +337,14 @@ def _probe_collection_schema(collection: str) -> Optional[Dict[str, Any]]: "sparse_vectors": sparse_vectors, "payload_indexes": getattr(getattr(info.config, "params", None), "payload_indexes", None), } + # Store in cache with timestamp for TTL _COLLECTION_SCHEMA_CACHE[collection] = schema + _COLLECTION_SCHEMA_CACHE_TS[collection] = time.time() return schema except Exception as e: logger.debug(f"Failed to build schema for collection '{collection}': {e}") return None - finally: - try: - client.close() - except Exception as e: - logger.debug(f"Suppressed exception: {e}") + # Note: Don't close client - it's from the shared pool def _filter_snapshot_only_recreate_keys( diff --git a/scripts/ingest/cli.py b/scripts/ingest/cli.py index 38b4320c..5a034797 100644 --- a/scripts/ingest/cli.py +++ b/scripts/ingest/cli.py @@ -209,7 +209,17 @@ def main(): load_dotenv(Path(__file__).parent.parent.parent / ".env") except ImportError: pass # python-dotenv not installed, rely on exported env vars - + + # Backend migration: detect file<->redis switch and migrate state if needed + try: + from scripts.workspace_state import detect_and_migrate_backend + ws_root = Path(os.environ.get("WORKSPACE_PATH") or os.environ.get("WATCH_ROOT") or "/work") + migrated = detect_and_migrate_backend(ws_root) + if migrated is not None: + print(f"[backend_migration] Migrated {migrated} items to new backend") + except Exception as e: + logger.warning(f"Backend migration check failed (continuing): {e}") + args = parse_args() # Map CLI overrides to env so downstream helpers pick them up diff --git a/scripts/ingest/config.py b/scripts/ingest/config.py index d9f5f69e..ecf3629d 100644 --- a/scripts/ingest/config.py +++ b/scripts/ingest/config.py @@ -287,6 +287,8 @@ def logical_repo_reuse_enabled() -> bool: # type: ignore[no-redef] indexing_lock, file_indexing_lock, is_file_locked, + set_indexing_started, + set_indexing_progress, ) except ImportError: # State integration is optional; continue if not available @@ -308,3 +310,5 @@ def logical_repo_reuse_enabled() -> bool: # type: ignore[no-redef] indexing_lock = None # type: ignore file_indexing_lock = None # type: ignore is_file_locked = None # type: ignore + set_indexing_started = None # type: ignore + set_indexing_progress = None # type: ignore diff --git a/scripts/ingest/pipeline.py b/scripts/ingest/pipeline.py index 405dc30b..0fe9e893 100644 --- a/scripts/ingest/pipeline.py +++ b/scripts/ingest/pipeline.py @@ -10,15 +10,29 @@ import logging import os import sys -import hashlib import time + +import xxhash import multiprocessing from concurrent.futures import ThreadPoolExecutor, as_completed +from contextlib import contextmanager from pathlib import Path from typing import List, Dict, Any, Optional, TYPE_CHECKING logger = logging.getLogger(__name__) +# Import lock contention handling for skip-and-retry logic during parallel indexing +try: + from scripts.workspace_state import LockContendedException, enable_lock_skip_on_contention +except ImportError: + # Fallback if not available + class LockContendedException(Exception): + pass + + @contextmanager + def enable_lock_skip_on_contention(): + yield + from qdrant_client import QdrantClient, models from scripts.ingest.config import ( @@ -51,6 +65,8 @@ get_workspace_state, compare_symbol_changes, file_indexing_lock, + set_indexing_started, + set_indexing_progress, ) from scripts.ingest.exclusions import ( iter_files, @@ -702,7 +718,7 @@ def _index_single_file_inner( language = detect_language(file_path) is_text_like = _is_text_like_language(language) - file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() + file_hash = xxhash.xxh64(text.encode("utf-8", errors="ignore")).hexdigest() repo_tag = repo_name_for_cache or _detect_repo_name_from_path(file_path) @@ -1465,6 +1481,14 @@ def index_repo( if log_progress: print(f"[index] Found {total_files} files to process under {root}") + # Track progress in workspace state for live UI updates + workspace_path = str(root) + started_at = time.strftime("%Y-%m-%dT%H:%M:%S") + try: + set_indexing_started(workspace_path, total_files) + except Exception as e: + logger.debug(f"Failed to set indexing started: {e}") + # Parallel file processing configuration # INDEX_WORKERS=0 or 1 means sequential (default for safety) # INDEX_WORKERS=N uses N threads (recommended: 4-8 for I/O-bound indexing) @@ -1478,8 +1502,12 @@ def index_repo( # Cap at reasonable max to avoid overwhelming Qdrant max_workers = min(index_workers, 16) if index_workers > 1 else 1 - def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception]]: - """Task for parallel file indexing. Returns (path, error_or_none).""" + def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: + """Task for parallel file indexing. + + Returns: + (path, error_or_none, was_skipped_due_to_lock) + """ per_file_repo = ( root_repo_for_cache if root_repo_for_cache is not None @@ -1498,37 +1526,85 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception]]: allowed_vectors=allowed_vectors, allowed_sparse=allowed_sparse, ) - return (file_path, None) + return (file_path, None, False) + except LockContendedException: + # Lock contention - mark for retry + return (file_path, None, True) except Exception as e: - return (file_path, e) + return (file_path, e, False) files_processed = 0 errors = [] + retry_queue: list[Path] = [] if max_workers > 1: - # Parallel processing with ThreadPoolExecutor + # Parallel processing with ThreadPoolExecutor and skip-and-retry for lock contention if log_progress: print(f"[index] Using {max_workers} parallel workers") - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = {executor.submit(_index_file_task, fp): fp for fp in files} - for future in as_completed(futures): - files_processed += 1 - file_path, error = future.result() + + # Enable skip-on-contention mode for parallel phase + # This makes Redis locks fail fast instead of blocking, allowing other files to proceed + with enable_lock_skip_on_contention(): + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(_index_file_task, fp): fp for fp in files} + last_progress_update = 0 + for future in as_completed(futures): + files_processed += 1 + file_path, error, was_skipped = future.result() + if was_skipped: + # Lock contention - queue for retry + retry_queue.append(file_path) + elif error: + errors.append((file_path, error)) + print(f"Error indexing {file_path}: {error}") + if log_progress and (files_processed % 25 == 0 or files_processed == total_files): + print(f"[index] {files_processed}/{total_files} files processed") + # Update progress in workspace state every 10 files for live UI + if files_processed - last_progress_update >= 10 or files_processed == total_files: + last_progress_update = files_processed + try: + set_indexing_progress( + workspace_path, started_at, files_processed, total_files, + str(file_path) if file_path else None + ) + except Exception as e: + logger.debug(f"Failed to update indexing progress: {e}") + + # Retry phase: process skipped files sequentially (no lock contention mode) + # This runs OUTSIDE the enable_lock_skip_on_contention context, so locks wait normally + if retry_queue: + if log_progress: + print(f"[index] Retrying {len(retry_queue)} files skipped due to lock contention") + for file_path in retry_queue: + _, error, was_skipped = _index_file_task(file_path) if error: errors.append((file_path, error)) print(f"Error indexing {file_path}: {error}") - if log_progress and (files_processed % 25 == 0 or files_processed == total_files): - print(f"[index] {files_processed}/{total_files} files processed") + # If still skipped on retry, that's a real issue - count as error + if was_skipped: + errors.append((file_path, LockContendedException("Lock still contended after retry"))) + logger.warning(f"Lock still contended after retry for {file_path}") else: - # Sequential processing (original behavior) + # Sequential processing (original behavior) - no retry needed + last_progress_update = 0 for file_path in iterator: files_processed += 1 - file_path, error = _index_file_task(file_path) + file_path, error, _ = _index_file_task(file_path) if error: errors.append((file_path, error)) print(f"Error indexing {file_path}: {error}") if log_progress and (files_processed % 25 == 0 or files_processed == total_files): print(f"[index] {files_processed}/{total_files} files processed") + # Update progress in workspace state every 10 files for live UI + if files_processed - last_progress_update >= 10 or files_processed == total_files: + last_progress_update = files_processed + try: + set_indexing_progress( + workspace_path, started_at, files_processed, total_files, + str(file_path) if file_path else None + ) + except Exception as e: + logger.debug(f"Failed to update indexing progress: {e}") if errors and log_progress: print(f"[index] Completed with {len(errors)} errors out of {total_files} files") @@ -1590,7 +1666,7 @@ def process_file_with_smart_reindexing( except Exception: file_path = Path(fp) - file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() + file_hash = xxhash.xxh64(text.encode("utf-8", errors="ignore")).hexdigest() # FAST PATH: Check if file hash is unchanged - skip entire processing if so # This avoids AST parsing for unchanged files (P1 optimization) diff --git a/scripts/ingest/qdrant.py b/scripts/ingest/qdrant.py index 26ed9038..8a899216 100644 --- a/scripts/ingest/qdrant.py +++ b/scripts/ingest/qdrant.py @@ -10,7 +10,8 @@ import logging import os import time -import hashlib + +import xxhash from pathlib import Path from typing import List, Dict, Any, Optional @@ -971,7 +972,7 @@ def flush_upserts(client: QdrantClient, collection: str) -> None: def hash_id(text: str, path: str, start: int, end: int) -> int: """Generate a stable hash ID for a chunk.""" - h = hashlib.sha1( + h = xxhash.xxh64( f"{path}:{start}-{end}\n{text}".encode("utf-8", errors="ignore") ).hexdigest() return int(h[:16], 16) diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 934f2274..4e36c5c9 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -2532,7 +2532,8 @@ async def code_search( RETURNS: Same schema as repo_search. """ # If include_memories is requested, delegate to context_search for blending - if include_memories: + # Coerce to bool first to handle string 'false'/'0' from some clients + if _coerce_bool(include_memories, default=False): return await context_search( query=query, limit=limit, @@ -3284,7 +3285,18 @@ async def neo4j_graph_query( _log_level_str = os.environ.get("LOG_LEVEL", "INFO").upper() _log_level = getattr(_logging, _log_level_str, _logging.INFO) _logging.getLogger().setLevel(_log_level) - + + # Backend migration: detect file<->redis switch and migrate state if needed + try: + from scripts.workspace_state import detect_and_migrate_backend + from pathlib import Path as _Path + _ws_root = _Path(os.environ.get("WORKSPACE_PATH") or os.environ.get("WATCH_ROOT") or "/work") + migrated = detect_and_migrate_backend(_ws_root) + if migrated is not None: + logger.info(f"[backend_migration] Migrated {migrated} items to new backend") + except Exception as e: + logger.warning(f"Backend migration check failed (continuing): {e}") + # Startup logging with configuration info logger.info("=" * 60) logger.info("MCP Indexer Server starting...") diff --git a/scripts/upload_service.py b/scripts/upload_service.py index fc8e66fa..ac2c2c23 100644 --- a/scripts/upload_service.py +++ b/scripts/upload_service.py @@ -42,7 +42,7 @@ import uvicorn from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request, status -from fastapi.responses import JSONResponse, RedirectResponse +from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from urllib.parse import urlencode @@ -55,6 +55,7 @@ from scripts.indexing_admin import ( build_admin_collections_view, + list_qdrant_collections, resolve_collection_root, spawn_ingest_code, recreate_collection_qdrant, @@ -85,6 +86,9 @@ list_collection_acl, grant_collection_access, revoke_collection_access, + create_api_key, + list_api_keys, + revoke_api_key, ) try: @@ -115,11 +119,13 @@ def _admin_ui_unavailable(*args, **kwargs): start_staging_rebuild, activate_staging_rebuild, abort_staging_rebuild, + get_neo4j_status, ) except ImportError: start_staging_rebuild = None # type: ignore activate_staging_rebuild = None # type: ignore abort_staging_rebuild = None # type: ignore + get_neo4j_status = None # type: ignore # Import existing workspace state and indexing functions try: @@ -379,7 +385,8 @@ def _get_valid_session_record(request: Request) -> Optional[Dict[str, Any]]: def _require_admin_session(request: Request) -> Dict[str, Any]: if not AUTH_ENABLED: - raise HTTPException(status_code=404, detail="Auth disabled") + # Allow access when auth is disabled (demo/dev mode) + return {"user_id": "demo", "role": "admin"} record = _get_valid_session_record(request) if record is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") @@ -647,12 +654,24 @@ async def auth_validate(payload: AuthValidateRequest): @app.get("/admin") async def admin_root(request: Request): + # Check for demo session cookie first (works in demo mode) + candidate = _get_session_candidate_from_request(request) + session_id = candidate.get("session_id") or "" + if not AUTH_ENABLED: - raise HTTPException(status_code=404, detail="Auth disabled") + # Demo mode: if we have a demo session cookie, go to dashboard + if session_id.startswith("demo-"): + return RedirectResponse(url="/admin/acl", status_code=302) + # Otherwise show login page for demo credentials + return RedirectResponse(url="/admin/login", status_code=302) + try: users_exist = has_any_users() except AuthDisabledError: - raise HTTPException(status_code=404, detail="Auth disabled") + # Fallback to demo mode behavior + if session_id.startswith("demo-"): + return RedirectResponse(url="/admin/acl", status_code=302) + return RedirectResponse(url="/admin/login", status_code=302) except Exception as e: logger.error(f"[upload_service] Failed to inspect user state for admin UI: {e}") raise HTTPException(status_code=500, detail="Failed to inspect user state") @@ -660,7 +679,6 @@ async def admin_root(request: Request): if not users_exist: return RedirectResponse(url="/admin/bootstrap", status_code=302) - candidate = _get_session_candidate_from_request(request) record = _get_valid_session_record(request) if record is None: return RedirectResponse(url="/admin/login", status_code=302) @@ -732,8 +750,7 @@ async def admin_bootstrap_submit( @app.get("/admin/login") async def admin_login_form(request: Request): - if not AUTH_ENABLED: - raise HTTPException(status_code=404, detail="Auth disabled") + # Show login page even in demo mode so users can enter demo credentials return render_admin_login(request) @@ -743,11 +760,31 @@ async def admin_login_submit( username: str = Form(...), password: str = Form(...), ): + # Demo mode: accept admin/admin when auth is disabled if not AUTH_ENABLED: - raise HTTPException(status_code=404, detail="Auth disabled") + if username == "admin" and password == "admin": + # Create a demo session cookie and redirect + import uuid + demo_session_id = f"demo-{uuid.uuid4().hex}" + resp = RedirectResponse(url="/admin/acl", status_code=302) + _set_admin_session_cookie(resp, demo_session_id) + return resp + return render_admin_login( + request=request, + error="Invalid credentials (demo mode: use admin/admin)", + status_code=401, + ) + try: user = authenticate_user(username, password) except AuthDisabledError: + # Fallback to demo mode + if username == "admin" and password == "admin": + import uuid + demo_session_id = f"demo-{uuid.uuid4().hex}" + resp = RedirectResponse(url="/admin/acl", status_code=302) + _set_admin_session_cookie(resp, demo_session_id) + return resp raise HTTPException(status_code=404, detail="Auth disabled") except Exception as e: logger.error(f"[upload_service] Error authenticating user for admin UI: {e}") @@ -781,12 +818,26 @@ async def admin_logout(): @app.get("/admin/acl") async def admin_acl_page(request: Request): _require_admin_session(request) + api_keys = [] try: - users = list_users() - collections = list_collections(include_deleted=False) - grants = list_collection_acl() + if AUTH_ENABLED: + users = list_users() + collections = list_collections(include_deleted=False) + grants = list_collection_acl() + try: + api_keys = list_api_keys() + except Exception as e: + logger.debug(f"[upload_service] Failed to load API keys: {e}") + else: + # Auth disabled - get collections directly from Qdrant + users = [] + collections = list_qdrant_collections() + grants = [] except AuthDisabledError: - raise HTTPException(status_code=404, detail="Auth disabled") + # Fallback: get collections directly from Qdrant + users = [] + collections = list_qdrant_collections() + grants = [] except Exception as e: logger.error(f"[upload_service] Failed to load admin UI data: {e}") raise HTTPException(status_code=500, detail="Failed to load admin data") @@ -810,11 +861,21 @@ async def admin_acl_page(request: Request): level = "success" flash = {"message": message, "level": level} + # Get Neo4j status + neo4j_status = {} + if callable(get_neo4j_status): + try: + neo4j_status = get_neo4j_status() + except Exception as e: + logger.debug(f"[upload_service] Failed to get Neo4j status: {e}") + resp = render_admin_acl( request, users=users, collections=enriched, grants=grants, + api_keys=api_keys, + neo4j_status=neo4j_status, deletion_enabled=ADMIN_COLLECTION_DELETE_ENABLED, work_dir=WORK_DIR, refresh_ms=ADMIN_COLLECTION_REFRESH_MS, @@ -859,6 +920,48 @@ async def admin_collections_status(request: Request): return JSONResponse({"collections": enriched}) +@app.get("/admin/collections/stream") +async def admin_collections_stream(request: Request): + """SSE endpoint for real-time collection status updates.""" + _require_admin_session(request) + + async def event_generator(): + """Generate SSE events for collection status updates.""" + last_data = None + while True: + # Check if client disconnected + if await request.is_disconnected(): + break + + try: + collections = list_collections(include_deleted=False) + enriched = await asyncio.to_thread( + lambda: build_admin_collections_view(collections=collections, work_dir=WORK_DIR) + ) + + # Only send if data changed + current_data = json.dumps(enriched, default=str) + if current_data != last_data: + last_data = current_data + yield f"data: {json.dumps({'type': 'full', 'collections': enriched}, default=str)}\n\n" + + except Exception as e: + yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" + + # Poll interval (2 seconds for SSE) + await asyncio.sleep(2) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + @app.post("/admin/collections/reindex") async def admin_reindex_collection( request: Request, @@ -1372,6 +1475,53 @@ async def admin_acl_revoke( return RedirectResponse(url="/admin/acl", status_code=302) +@app.post("/admin/keys") +async def admin_create_api_key( + request: Request, + name: str = Form(...), + scope: str = Form("read"), + expires: str = Form("never"), +): + _require_admin_session(request) + try: + key_info = create_api_key(name=name, scope=scope, expires_in=expires) + # Flash the new key to the user (only shown once) + new_key = key_info.get("key", "") + return render_admin_acl( + request, + users=list_users() if AUTH_ENABLED else [], + collections=build_admin_collections_view( + collections=list_collections(include_deleted=False) if AUTH_ENABLED else list_qdrant_collections(), + work_dir=WORK_DIR, + ), + grants=list_collection_acl() if AUTH_ENABLED else [], + api_keys=list_api_keys() if AUTH_ENABLED else [], + deletion_enabled=ADMIN_COLLECTION_DELETE_ENABLED, + work_dir=WORK_DIR, + refresh_ms=ADMIN_COLLECTION_REFRESH_MS, + flash={"message": f"API key created! Copy it now (shown only once): {new_key}", "level": "success"}, + ) + except AuthDisabledError: + raise HTTPException(status_code=404, detail="Auth disabled") + except Exception as e: + return render_admin_error(request, title="Create API Key Failed", message=str(e), back_href="/admin/acl") + + +@app.post("/admin/keys/revoke") +async def admin_revoke_api_key( + request: Request, + key_id: str = Form(...), +): + _require_admin_session(request) + try: + revoke_api_key(key_id=key_id) + except AuthDisabledError: + raise HTTPException(status_code=404, detail="Auth disabled") + except Exception as e: + return render_admin_error(request, title="Revoke API Key Failed", message=str(e), back_href="/admin/acl") + return RedirectResponse(url="/admin/acl", status_code=302) + + @app.post("/auth/users", response_model=AuthUserCreateResponse) async def auth_create_user(payload: AuthUserCreateRequest, request: Request): try: diff --git a/scripts/watch_index.py b/scripts/watch_index.py index 91aad268..759c636a 100644 --- a/scripts/watch_index.py +++ b/scripts/watch_index.py @@ -111,6 +111,15 @@ def main() -> None: _start_health_server() _watcher_started_at = datetime.now(timezone.utc).isoformat() + # Backend migration: detect file<->redis switch and migrate state if needed + try: + from scripts.workspace_state import detect_and_migrate_backend + migrated = detect_and_migrate_backend(ROOT) + if migrated is not None: + print(f"[backend_migration] Migrated {migrated} items to new backend") + except Exception as e: + logger.warning(f"Backend migration check failed (continuing): {e}") + # Resolve collection name from workspace state before any client/state ops try: from scripts.workspace_state import get_collection_name_with_staging as _get_coll diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index ea63506a..55f2fa5c 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -19,6 +19,7 @@ import uuid import subprocess import hashlib +import xxhash from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional, List, Literal, TypedDict @@ -41,6 +42,12 @@ _REDIS_CLIENT = None _REDIS_CLIENT_LOCK = threading.Lock() +# Global flag to enable skip-if-contended behavior for Redis locks. +# When set, locks that are contended will raise LockContendedException +# instead of waiting, allowing the caller to retry later. +# Uses threading.Event for thread-safe cross-thread visibility in ThreadPoolExecutor. +_lock_skip_contended_event = threading.Event() + def _redis_state_enabled() -> bool: backend = str(os.environ.get("CODEBASE_STATE_BACKEND", "") or "").strip().lower() @@ -54,6 +61,11 @@ def _redis_state_enabled() -> bool: return str(raw).strip().lower() in {"1", "true", "yes", "on"} +def _get_current_backend() -> str: + """Return the current backend name: 'redis' or 'file'.""" + return "redis" if _redis_state_enabled() else "file" + + def _redis_state_active() -> bool: if not _redis_state_enabled(): return False @@ -66,7 +78,7 @@ def _redis_prefix() -> str: def _redis_key_for_path(kind: str, path: Path) -> str: raw = str(path) - digest = hashlib.md5(raw.encode("utf-8")).hexdigest() + digest = xxhash.xxh64(raw.encode("utf-8")).hexdigest() return f"{_redis_prefix()}:{kind}:{digest}" @@ -232,8 +244,62 @@ def _redis_delete(kind: str, path: Path) -> bool: return False +class LockContendedException(Exception): + """Raised when a Redis lock cannot be acquired and skip_if_contended=True.""" + pass + + @contextmanager -def _redis_lock(kind: str, path: Path): +def enable_lock_skip_on_contention(): + """Context manager to enable skip-if-contended behavior for Redis locks. + + When active, any Redis lock contention will raise LockContendedException + instead of waiting, allowing the caller to retry the operation later. + This is useful for parallel processing where one file's lock shouldn't + block processing of other files. + + This uses a threading.Event for cross-thread visibility, so worker threads + in a ThreadPoolExecutor will see the flag. + + Example: + with enable_lock_skip_on_contention(): + with ThreadPoolExecutor() as executor: + # Worker threads will see the skip flag + futures = [executor.submit(index_file, f) for f in files] + """ + _lock_skip_contended_event.set() + try: + yield + finally: + _lock_skip_contended_event.clear() + + +def _should_skip_if_contended() -> bool: + """Check if skip-if-contended mode is enabled (thread-safe, cross-thread visible).""" + return _lock_skip_contended_event.is_set() + + +@contextmanager +def _redis_lock(kind: str, path: Path, *, skip_if_contended: bool = False): + """Acquire a Redis distributed lock for the given resource. + + Args: + kind: Type of resource ("state", "cache", "symbols") + path: Path to the resource being locked + skip_if_contended: If True, raise LockContendedException after a few quick + attempts instead of waiting the full timeout. Useful for parallel processing + where the caller can retry later. Also enabled if the thread-local + enable_lock_skip_on_contention() context manager is active. + + Yields: + None when lock is acquired (or Redis not available) + + Raises: + LockContendedException: When skip_if_contended=True and lock is contended + """ + # Check thread-local flag as well as explicit parameter + skip_mode = skip_if_contended or _should_skip_if_contended() + client = _get_redis_client() if client is None: yield @@ -244,14 +310,23 @@ def _redis_lock(kind: str, path: Path): ttl_ms = int(os.environ.get("CODEBASE_STATE_REDIS_LOCK_TTL_MS", "5000") or 5000) except Exception: ttl_ms = 5000 - try: - wait_ms = int(os.environ.get("CODEBASE_STATE_REDIS_LOCK_WAIT_MS", "2000") or 2000) - except Exception: - wait_ms = 2000 - deadline = time.time() + (wait_ms / 1000.0) + + # For skip_if_contended mode, try just a few times quickly (100-200ms max) + # For normal mode, wait up to REDIS_LOCK_WAIT_MS (default 2000ms) + if skip_mode: + max_attempts = 4 + sleep_interval = 0.05 # 50ms between attempts = 200ms max + else: + try: + wait_ms = int(os.environ.get("CODEBASE_STATE_REDIS_LOCK_WAIT_MS", "2000") or 2000) + except Exception: + wait_ms = 2000 + max_attempts = int(wait_ms / 50) + 1 # 50ms per attempt + sleep_interval = 0.05 + acquired = False attempts = 0 - while time.time() < deadline: + for _ in range(max_attempts): attempts += 1 try: if client.set(lock_key, token, nx=True, px=ttl_ms): @@ -260,12 +335,18 @@ def _redis_lock(kind: str, path: Path): except Exception as e: logger.warning(f"Redis lock set failed for {lock_key}: {e}") break - time.sleep(0.05) + time.sleep(sleep_interval) + if not acquired: - logger.info(f"Redis lock not acquired for {lock_key} after {attempts} attempts, proceeding without lock") - yield - return - logger.info(f"Redis lock acquired for {lock_key} (attempts={attempts}, ttl={ttl_ms}ms)") + if skip_mode: + logger.debug(f"Redis lock contended for {lock_key}, skipping (attempts={attempts})") + raise LockContendedException(f"Lock contended: {lock_key}") + else: + logger.info(f"Redis lock not acquired for {lock_key} after {attempts} attempts, proceeding without lock") + yield + return + + logger.debug(f"Redis lock acquired for {lock_key} (attempts={attempts}, ttl={ttl_ms}ms)") try: yield finally: @@ -281,6 +362,261 @@ def _redis_lock(kind: str, path: Path): logger.warning(f"Redis lock release failed for {lock_key}: {e}") +# --------------------------------------------------------------------------- +# Backend Migration: file <-> redis +# --------------------------------------------------------------------------- + +_BACKEND_MARKER_FILENAME = ".backend_marker" +_migration_lock = threading.Lock() +_migration_done = False + + +def _get_backend_marker_path(workspace_root: Optional[Path] = None) -> Path: + """Get path to the backend marker file.""" + if workspace_root is None: + workspace_root = Path(os.environ.get("WORKSPACE_PATH") or os.environ.get("WATCH_ROOT") or "/work") + return workspace_root / ".codebase" / _BACKEND_MARKER_FILENAME + + +def _read_backend_marker(workspace_root: Optional[Path] = None) -> Optional[str]: + """Read the last known backend from the marker file.""" + marker_path = _get_backend_marker_path(workspace_root) + if not marker_path.exists(): + return None + try: + content = marker_path.read_text(encoding="utf-8").strip() + if content in {"redis", "file"}: + return content + except Exception as e: + logger.debug(f"Failed to read backend marker: {e}") + return None + + +def _write_backend_marker(backend: str, workspace_root: Optional[Path] = None) -> None: + """Write the current backend to the marker file.""" + marker_path = _get_backend_marker_path(workspace_root) + try: + marker_path.parent.mkdir(parents=True, exist_ok=True) + marker_path.write_text(backend + "\n", encoding="utf-8") + try: + os.chmod(marker_path, 0o664) + except PermissionError: + pass + except Exception as e: + logger.warning(f"Failed to write backend marker: {e}") + + +def _migrate_file_to_redis(workspace_root: Path) -> int: + """Migrate all state/cache/symbols from files to Redis. + + Returns the count of migrated items. + """ + if not _redis_state_enabled(): + logger.warning("Cannot migrate to Redis: Redis backend not enabled") + return 0 + + client = _get_redis_client() + if client is None: + logger.warning("Cannot migrate to Redis: Redis client not available") + return 0 + + migrated = 0 + codebase_dir = workspace_root / ".codebase" + + if not codebase_dir.exists(): + logger.info(f"No .codebase directory found at {codebase_dir}, nothing to migrate") + return 0 + + logger.info(f"Starting file->Redis migration from {codebase_dir}") + + # Migrate state.json files + for state_file in codebase_dir.rglob("state.json"): + try: + with open(state_file, "r", encoding="utf-8-sig") as f: + state = json.load(f) + if isinstance(state, dict): + # Use the file path as the key basis (same as normal operation) + if _redis_set_json("state", state_file, state): + migrated += 1 + logger.debug(f"Migrated state: {state_file}") + except Exception as e: + logger.warning(f"Failed to migrate state file {state_file}: {e}") + + # Migrate cache.json files + for cache_file in codebase_dir.rglob("cache.json"): + try: + with open(cache_file, "r", encoding="utf-8-sig") as f: + cache = json.load(f) + if isinstance(cache, dict): + if _redis_set_json("cache", cache_file, cache): + migrated += 1 + logger.debug(f"Migrated cache: {cache_file}") + except Exception as e: + logger.warning(f"Failed to migrate cache file {cache_file}: {e}") + + # Migrate symbols/*.json files + for symbols_dir in codebase_dir.rglob("symbols"): + if not symbols_dir.is_dir(): + continue + for symbol_file in symbols_dir.glob("*.json"): + try: + with open(symbol_file, "r", encoding="utf-8-sig") as f: + symbols = json.load(f) + if isinstance(symbols, dict): + if _redis_set_json("symbols", symbol_file, symbols): + migrated += 1 + logger.debug(f"Migrated symbols: {symbol_file}") + except Exception as e: + logger.warning(f"Failed to migrate symbol file {symbol_file}: {e}") + + logger.info(f"File->Redis migration complete: {migrated} items migrated") + return migrated + + +def _migrate_redis_to_file(workspace_root: Path) -> int: + """Migrate all state/cache/symbols from Redis to files. + + Returns the count of migrated items. + """ + # We need to temporarily bypass the Redis check to read from Redis + # even though the current backend is set to 'file' + try: + import redis as redis_pkg + except ImportError: + logger.warning("Cannot migrate from Redis: redis package not available") + return 0 + + url = os.environ.get("CODEBASE_STATE_REDIS_URL") or os.environ.get("REDIS_URL") or "redis://redis:6379/0" + try: + client = redis_pkg.Redis.from_url(url, decode_responses=True, socket_timeout=2.0) + client.ping() + except Exception as e: + logger.warning(f"Cannot migrate from Redis: connection failed: {e}") + return 0 + + migrated = 0 + prefix = _redis_prefix() + + logger.info(f"Starting Redis->file migration to {workspace_root}") + + codebase_dir = workspace_root / ".codebase" + codebase_dir.mkdir(parents=True, exist_ok=True) + + # Scan and migrate each kind + for kind in ("state", "cache", "symbols"): + pattern = f"{prefix}:{kind}:*" + try: + for key in client.scan_iter(match=pattern, count=200): + try: + raw = client.get(key) + if not raw: + continue + obj = json.loads(raw) + if not isinstance(obj, dict): + continue + + # Determine file path from the object's stored metadata or derive it + # For state/cache, we need to reconstruct the path + # The key is a hash of the path, so we need to store path in the object + file_path_str = obj.get("_source_path") or obj.get("workspace_path") + if not file_path_str: + # Fallback: write to default location based on kind + if kind == "state": + file_path = codebase_dir / "state.json" + elif kind == "cache": + file_path = codebase_dir / "cache.json" + else: + # For symbols, use a hash-based name + key_hash = key.split(":")[-1] if ":" in key else xxhash.xxh64(key.encode()).hexdigest() + symbols_dir = codebase_dir / "symbols" + symbols_dir.mkdir(parents=True, exist_ok=True) + file_path = symbols_dir / f"{key_hash[:16]}.json" + else: + file_path = Path(file_path_str) + # Ensure it's under the workspace + if not str(file_path).startswith(str(workspace_root)): + # Reconstruct relative path under workspace + if kind == "state": + file_path = codebase_dir / "state.json" + elif kind == "cache": + file_path = codebase_dir / "cache.json" + else: + continue + + # Write to file + file_path.parent.mkdir(parents=True, exist_ok=True) + with open(file_path, "w", encoding="utf-8") as f: + json.dump(obj, f, indent=2, ensure_ascii=False) + try: + os.chmod(file_path, 0o664) + except PermissionError: + pass + migrated += 1 + logger.debug(f"Migrated {kind}: {key} -> {file_path}") + except Exception as e: + logger.warning(f"Failed to migrate Redis key {key}: {e}") + except Exception as e: + logger.warning(f"Failed to scan Redis keys for {kind}: {e}") + + logger.info(f"Redis->file migration complete: {migrated} items migrated") + return migrated + + +def detect_and_migrate_backend(workspace_root: Optional[Path] = None) -> Optional[int]: + """Detect backend switch and trigger migration if needed. + + Call this on startup from indexer/watcher/MCP servers. + + Returns: + - None if no migration needed + - int count of migrated items if migration occurred + """ + global _migration_done + + # Skip if migration already done in this process + with _migration_lock: + if _migration_done: + return None + _migration_done = True + + # Skip if explicitly disabled + if os.environ.get("CODEBASE_STATE_SKIP_MIGRATION", "").strip().lower() in {"1", "true", "yes"}: + logger.info("Backend migration skipped (CODEBASE_STATE_SKIP_MIGRATION=1)") + return None + + if workspace_root is None: + workspace_root = Path(os.environ.get("WORKSPACE_PATH") or os.environ.get("WATCH_ROOT") or "/work") + + current_backend = _get_current_backend() + last_backend = _read_backend_marker(workspace_root) + + logger.info(f"Backend check: current={current_backend}, last={last_backend}") + + if last_backend is None: + # First run or marker missing - just set the marker + _write_backend_marker(current_backend, workspace_root) + logger.info(f"Backend marker initialized: {current_backend}") + return None + + if last_backend == current_backend: + # No change + return None + + # Backend has changed - migrate! + logger.info(f"Backend switch detected: {last_backend} -> {current_backend}") + + migrated = 0 + if last_backend == "file" and current_backend == "redis": + migrated = _migrate_file_to_redis(workspace_root) + elif last_backend == "redis" and current_backend == "file": + migrated = _migrate_redis_to_file(workspace_root) + + # Update the marker + _write_backend_marker(current_backend, workspace_root) + + return migrated + + def is_staging_enabled() -> bool: raw = os.environ.get("CTXCE_STAGING_ENABLED", "") v = (raw or "").strip().lower() @@ -741,8 +1077,7 @@ def _cross_process_lock(lock_path: Path): def _get_file_lock_path(file_path: str) -> Path: """Get the lock file path for a given file.""" # Use hash of file path to avoid filesystem path issues - import hashlib - path_hash = hashlib.md5(file_path.encode()).hexdigest()[:16] + path_hash = xxhash.xxh64(file_path.encode()).hexdigest()[:16] return _FILE_LOCKS_DIR / f"{path_hash}.lock" @@ -2504,7 +2839,7 @@ def _get_symbol_cache_path(file_path: str) -> Path: try: fp = _normalize_cache_key_path(file_path) # Create symbol cache using file hash to handle renames - file_hash = hashlib.md5(fp.encode('utf-8')).hexdigest()[:8] + file_hash = xxhash.xxh64(fp.encode('utf-8')).hexdigest()[:8] if is_multi_repo_mode(): repo_name = _detect_repo_name_from_path(Path(file_path)) if repo_name: diff --git a/templates/admin/acl.html b/templates/admin/acl.html index 952a0ce9..27c803ed 100644 --- a/templates/admin/acl.html +++ b/templates/admin/acl.html @@ -1,474 +1,785 @@ {% extends "admin/base.html" %} {% block content %} -
| id | username | role |
|---|---|---|
| {{ u.id }} | -{{ u.username }} | -{{ u.role }} | -
| (none) | ||
Manage your indexed codebases
+- WARNING: Collection deletion is enabled on this server. Deleting a collection can remove server-side files under {{ work_dir }}. - If {{ work_dir }} is a bind mount of your only copy of code, you can lose data. Practical guidance: Standard compose: treat /work as “possibly my real repo”. Keep CTXCE_ADMIN_COLLECTION_DELETE_ENABLED=0 unless you really mean it. -
- {% else %} -Collection deletion is disabled by server configuration.
+| id | -qdrant_collection | -indexing_status | -mapping | -applied hash | -pending hash | -current hash | -actions | -||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| {{ c.id }} | -
- {{ c.qdrant_collection }}
- {% if c.container_path %}
- {{ c.container_path }}
- {% endif %}
- {% if c.repo_name %}
- repo: {{ c.repo_name }}
- {% endif %}
- |
-
- {{ c.indexing_state or "idle" }}
- {% if c.indexing_started_at %}
- started {{ c.indexing_started_at }}
- {% endif %}
- {% if c.progress_files_processed is not none %}
-
- {{ c.progress_files_processed }}
- {% if c.progress_total_files %}
- / {{ c.progress_total_files }}
- {% endif %}
- files
-
- {% endif %}
- {% if c.progress_current_file %}
- {{ c.progress_current_file }}
- {% endif %}
- |
-
- {% if c.has_mapping %}
- ok ({{ c.mapping_count }})
- {% else %}
- missing
- {% endif %}
- {% if c.needs_recreate %}
-
- maintenance needed — recreate
+
+
|
+ ||||||||||||||||||||||||
| No collections found | |||||||||||||||||||||||||||