From 035a5529f484f5af53b8e01d6fb59e2082bf48cf Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 17:39:27 -0500 Subject: [PATCH 01/14] Update mcp_indexer_server.py --- scripts/mcp_indexer_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 934f2274..173aafe8 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -2532,7 +2532,8 @@ async def code_search( RETURNS: Same schema as repo_search. """ # If include_memories is requested, delegate to context_search for blending - if include_memories: + # Coerce to bool first to handle string 'false'/'0' from some clients + if _coerce_bool(include_memories, default=False): return await context_search( query=query, limit=limit, From bfb4713b28aae300b4e9d1b771db2a1e7de18509 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 17:40:03 -0500 Subject: [PATCH 02/14] Improve status table handling for model and index info Enhanced the status table to better handle 'last_ingested_at' timestamps and provide more granular model warmup statuses, including 'warm', 'warming', 'cold', and 'failed'. This improves clarity and accuracy in status reporting. --- scripts/ctx_cli/commands/status.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/scripts/ctx_cli/commands/status.py b/scripts/ctx_cli/commands/status.py index c2b76205..fd0dad74 100644 --- a/scripts/ctx_cli/commands/status.py +++ b/scripts/ctx_cli/commands/status.py @@ -576,7 +576,12 @@ def print_status_table( points_count = qdrant_status_info.get("count", "Unknown") if isinstance(points_count, int): points_count = f"{points_count:,}" - last_indexed = format_timestamp(qdrant_status_info.get("last_indexed")) + # qdrant_status returns last_ingested_at as {"unix": int, "iso": str} + last_ingested = qdrant_status_info.get("last_ingested_at") + if isinstance(last_ingested, dict): + last_indexed = format_timestamp(last_ingested.get("iso")) + else: + last_indexed = format_timestamp(qdrant_status_info.get("last_indexed")) elif collection_info: collection_name = collection_info.get("collection", "Unknown") points_count = collection_info.get("points_count", collection_info.get("count", "Unknown")) @@ -640,16 +645,27 @@ def print_status_table( # Model warmup status if warmup_info: - embedding_ready = warmup_info.get("embedding_ready", False) - reranker_ready = warmup_info.get("reranker_ready", False) - if embedding_ready and reranker_ready: + # warmup_info returns status: "warm"|"warming"|"cold"|"failed" + # and embedding_ms/reranker_ms when available + warmup_status = warmup_info.get("status", "cold") + embedding_ready = warmup_info.get("embedding_ready", warmup_info.get("embedding_ms") is not None) + reranker_ready = warmup_info.get("reranker_ready", warmup_info.get("reranker_ms") is not None) + if warmup_status == "warm" or (embedding_ready and reranker_ready): models_text = Text() models_text.append("● ", style="green") models_text.append("Ready", style="green") - else: + elif warmup_status == "warming": models_text = Text() models_text.append("◐ ", style="yellow") models_text.append("Loading", style="yellow") + elif warmup_status == "failed": + models_text = Text() + models_text.append("✗ ", style="red") + models_text.append("Failed", style="red") + else: + models_text = Text() + models_text.append("○ ", style="dim") + models_text.append("Cold", style="dim") table.add_row("Models", models_text) # Indexing progress From 92d864a4ab5f55c881c4dce5696e929da6520160 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 17:40:17 -0500 Subject: [PATCH 03/14] Improve model warmup status display in status table Enhanced the logic for displaying model warmup status by considering the 'status' field from warmup_info and providing more granular symbols and text for 'Ready', 'Loading', 'Failed', and 'Cold' states. This makes the status output more informative and accurate. --- scripts/ctx_cli/commands/status.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/scripts/ctx_cli/commands/status.py b/scripts/ctx_cli/commands/status.py index fd0dad74..fb80e11f 100644 --- a/scripts/ctx_cli/commands/status.py +++ b/scripts/ctx_cli/commands/status.py @@ -730,10 +730,22 @@ def print_status_table( print(f"│ Last index: {last_indexed}" + " " * (46 - len(f"Last index: {last_indexed}") - 2) + "│") if warmup_info: - embedding_ready = warmup_info.get("embedding_ready", False) - reranker_ready = warmup_info.get("reranker_ready", False) - models_symbol = "✓" if (embedding_ready and reranker_ready) else "⚠" - models_text = "Ready" if (embedding_ready and reranker_ready) else "Loading" + # warmup_info returns status: "warm"|"warming"|"cold"|"failed" + warmup_status = warmup_info.get("status", "cold") + embedding_ready = warmup_info.get("embedding_ready", warmup_info.get("embedding_ms") is not None) + reranker_ready = warmup_info.get("reranker_ready", warmup_info.get("reranker_ms") is not None) + if warmup_status == "warm" or (embedding_ready and reranker_ready): + models_symbol = "✓" + models_text = "Ready" + elif warmup_status == "warming": + models_symbol = "⚠" + models_text = "Loading" + elif warmup_status == "failed": + models_symbol = "✗" + models_text = "Failed" + else: + models_symbol = "○" + models_text = "Cold" print(f"│ Models: {models_symbol} {models_text}" + " " * (46 - len(f"Models: {models_symbol} {models_text}") - 2) + "│") if workspace_info: From 5417403b401d1f4aff7adeb044d1e7de3d72dd0c Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 17:41:00 -0500 Subject: [PATCH 04/14] Update status.py --- scripts/ctx_cli/commands/status.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scripts/ctx_cli/commands/status.py b/scripts/ctx_cli/commands/status.py index fb80e11f..dd22dc59 100644 --- a/scripts/ctx_cli/commands/status.py +++ b/scripts/ctx_cli/commands/status.py @@ -794,9 +794,11 @@ def print_status_table( print("│ " + "Model".ljust(25) + " Status".ljust(23) + "│") print("├" + "─" * 50 + "┤") + # Derive ready status from timing values when explicit booleans not present + warmup_status = warmup_info.get("status", "cold") models = [ - ("Embedding", warmup_info.get("embedding_ready", False)), - ("Reranker", warmup_info.get("reranker_ready", False)), + ("Embedding", warmup_info.get("embedding_ready", warmup_info.get("embedding_ms") is not None)), + ("Reranker", warmup_info.get("reranker_ready", warmup_info.get("reranker_ms") is not None)), ("Decoder", warmup_info.get("decoder_ready", False)), ] From 69e531c8ceae1126ff062fcb55ca55f07b154ddf Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 17:55:19 -0500 Subject: [PATCH 05/14] Add Redis integration and improve reset command options Introduces a Redis service to docker-compose for use as a cache/state backend, updates environment variables for Redis configuration, and ensures relevant services depend on Redis. Enhances the ctx_cli reset command with a --db-reset option to reset database volumes, refines mode selection (defaulting to HTTP MCPs), and updates help text and logic for clarity and flexibility. --- docker-compose.yml | 42 ++++++++++++ scripts/ctx_cli/commands/reset.py | 106 +++++++++++++++++++++--------- 2 files changed, 116 insertions(+), 32 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 0f14cda6..a3b2cb72 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,24 @@ version: '3.8' services: + # Redis cache/state backend + redis: + image: redis:7-alpine + container_name: redis-cache + ports: + - "6379:6379" + command: ["redis-server", "--appendonly", "yes", "--maxmemory", "256mb", "--maxmemory-policy", "allkeys-lru"] + volumes: + - redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + restart: unless-stopped + networks: + - dev-remote-network + # Qdrant vector database - same as base compose qdrant: image: qdrant/qdrant:latest @@ -102,6 +120,7 @@ services: command: ["sh", "-c", "mkdir -p /tmp/huggingface/hub /tmp/huggingface/transformers /tmp/huggingface/fastembed && exec python /app/scripts/mcp_indexer_server.py"] depends_on: - qdrant + - redis env_file: - .env environment: @@ -172,6 +191,10 @@ services: - RERANKER_WEIGHTS_DIR=/tmp/rerank_weights - RERANK_EVENTS_DIR=/tmp/rerank_events - RERANK_EVENTS_ENABLED=${RERANK_EVENTS_ENABLED:-1} + # Redis state backend + - CODEBASE_STATE_BACKEND=${CODEBASE_STATE_BACKEND:-file} + - CODEBASE_STATE_REDIS_URL=${CODEBASE_STATE_REDIS_URL:-redis://redis:6379/0} + - CODEBASE_STATE_REDIS_PREFIX=${CODEBASE_STATE_REDIS_PREFIX:-context-engine:codebase} ports: - "${FASTMCP_INDEXER_PORT:-8001}:8001" - "18001:18001" @@ -316,6 +339,7 @@ services: command: ["sh", "-c", "mkdir -p /tmp/huggingface/hub /tmp/huggingface/transformers /tmp/huggingface/fastembed && exec python /app/scripts/mcp_indexer_server.py"] depends_on: - qdrant + - redis env_file: - .env environment: @@ -378,6 +402,10 @@ services: - RERANKER_WEIGHTS_DIR=/tmp/rerank_weights - RERANK_EVENTS_DIR=/tmp/rerank_events - RERANK_EVENTS_ENABLED=${RERANK_EVENTS_ENABLED:-1} + # Redis state backend + - CODEBASE_STATE_BACKEND=${CODEBASE_STATE_BACKEND:-file} + - CODEBASE_STATE_REDIS_URL=${CODEBASE_STATE_REDIS_URL:-redis://redis:6379/0} + - CODEBASE_STATE_REDIS_PREFIX=${CODEBASE_STATE_REDIS_PREFIX:-context-engine:codebase} ports: - "${FASTMCP_INDEXER_HTTP_PORT:-8003}:8001" - "${FASTMCP_INDEXER_HTTP_HEALTH_PORT:-18003}:18001" @@ -415,6 +443,7 @@ services: user: "1000:1000" depends_on: - qdrant + - redis env_file: - .env environment: @@ -459,6 +488,10 @@ services: - PSEUDO_DEFER_TO_WORKER=${PSEUDO_DEFER_TO_WORKER:-1} # Parallel indexing - number of worker threads (default: 4, use -1 for CPU count) - INDEX_WORKERS=${INDEX_WORKERS:-4} + # Redis state backend + - CODEBASE_STATE_BACKEND=${CODEBASE_STATE_BACKEND:-file} + - CODEBASE_STATE_REDIS_URL=${CODEBASE_STATE_REDIS_URL:-redis://redis:6379/0} + - CODEBASE_STATE_REDIS_PREFIX=${CODEBASE_STATE_REDIS_PREFIX:-context-engine:codebase} volumes: - workspace_pvc:/work:rw - codebase_pvc:/work/.codebase:rw @@ -477,6 +510,7 @@ services: user: "1000:1000" depends_on: - qdrant + - redis env_file: - .env environment: @@ -524,6 +558,10 @@ services: - NEO4J_GRAPH=${NEO4J_GRAPH:-} # Defer pseudo-tag generation - watcher runs backfill worker thread - PSEUDO_DEFER_TO_WORKER=${PSEUDO_DEFER_TO_WORKER:-1} + # Redis state backend + - CODEBASE_STATE_BACKEND=${CODEBASE_STATE_BACKEND:-file} + - CODEBASE_STATE_REDIS_URL=${CODEBASE_STATE_REDIS_URL:-redis://redis:6379/0} + - CODEBASE_STATE_REDIS_PREFIX=${CODEBASE_STATE_REDIS_PREFIX:-context-engine:codebase} volumes: - workspace_pvc:/work:rw - codebase_pvc:/work/.codebase:rw @@ -702,6 +740,10 @@ volumes: rerank_events: driver: local + # Redis data persistence + redis_data: + driver: local + # Custom network for service discovery networks: dev-remote-network: diff --git a/scripts/ctx_cli/commands/reset.py b/scripts/ctx_cli/commands/reset.py index 3e8642cf..c09e3483 100644 --- a/scripts/ctx_cli/commands/reset.py +++ b/scripts/ctx_cli/commands/reset.py @@ -147,10 +147,11 @@ def _download_file(url: str, dest: Path, description: str) -> bool: def reset( - mode: str = "dual", + mode: str = "mcp", skip_build: bool = False, skip_model: bool = False, skip_tokenizer: bool = False, + db_reset: bool = False, model_url: Optional[str] = None, model_path: Optional[str] = None, tokenizer_url: Optional[str] = None, @@ -163,14 +164,14 @@ def reset( recreates indexes, and starts services in the specified mode. Modes: - dual: Both SSE and HTTP MCPs (default, most comprehensive) - mcp: HTTP MCPs only (streamable, Codex compatible) - sse: SSE MCPs only (legacy) + (default): HTTP MCPs only (streamable, Codex compatible) + --dual: Both SSE and HTTP MCPs (most comprehensive) + --sse: SSE MCPs only (legacy) Examples: - ctx reset # Full reset with dual mode - ctx reset --mcp # HTTP MCPs only - ctx reset --sse # SSE MCPs only + ctx reset # Full reset with HTTP MCPs (default) + ctx reset --dual # Both SSE and HTTP MCPs + ctx reset --sse # SSE MCPs only (legacy) ctx reset --skip-model # Skip model download ctx reset --skip-build # Skip container rebuild """ @@ -193,22 +194,35 @@ def reset( if neo4j_enabled: compose_cmd.extend(["-f", "docker-compose.yml", "-f", "docker-compose.neo4j.yml"]) + # Check if Redis is enabled (CODEBASE_STATE_BACKEND=redis) + redis_enabled = os.environ.get("CODEBASE_STATE_BACKEND", "").strip().lower() == "redis" + + # Check if learning reranker is enabled + rerank_learning_enabled = os.environ.get("RERANK_LEARNING", "1").strip().lower() in ("1", "true", "yes") + # Determine which containers to build/start based on mode - if mode == "mcp": - # HTTP MCPs only (Codex compatible) + upload_service for remote sync - build_containers = ["indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] - start_containers = ["mcp_http", "mcp_indexer_http", "watcher", "upload_service"] - mode_desc = "HTTP MCPs only (streamable)" - elif mode == "sse": - # SSE MCPs only (legacy) + # Default is HTTP-only; SSE only starts when explicitly requested with --sse + if mode == "sse": + # SSE MCPs only (legacy, must be explicitly requested) build_containers = ["indexer", "mcp", "mcp_indexer", "watcher"] start_containers = ["mcp", "mcp_indexer", "watcher"] mode_desc = "SSE MCPs only (legacy)" - else: - # Dual mode (default) + elif mode == "dual": + # Dual mode (both SSE and HTTP) build_containers = ["indexer", "mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] start_containers = ["mcp", "mcp_indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] mode_desc = "Dual mode (SSE + HTTP)" + else: + # HTTP MCPs only (default, Codex compatible) + upload_service for remote sync + build_containers = ["indexer", "mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + start_containers = ["mcp_http", "mcp_indexer_http", "watcher", "upload_service"] + mode_desc = "HTTP MCPs (streamable)" + + # Add learning_worker if rerank learning is enabled + if rerank_learning_enabled: + build_containers.append("learning_worker") + start_containers.append("learning_worker") + mode_desc += " + Learning Reranker" # Add llamacpp container if needed if llamacpp_needed: @@ -223,11 +237,16 @@ def reset( start_containers.insert(0, "neo4j") # Start neo4j first (other services depend on it) mode_desc += " + Neo4j" + # Add Redis indicator to mode description (Redis is a dependency, not a built container) + if redis_enabled: + mode_desc += " + Redis" + _print_panel( f"[cyan]Mode:[/cyan] {mode_desc}\n" f"[cyan]Skip Build:[/cyan] {skip_build}\n" f"[cyan]Skip Model:[/cyan] {skip_model}\n" - f"[cyan]Skip Tokenizer:[/cyan] {skip_tokenizer}", + f"[cyan]Skip Tokenizer:[/cyan] {skip_tokenizer}\n" + f"[cyan]DB Reset:[/cyan] {db_reset}", title="Development Environment Reset", border_style="yellow" ) @@ -236,11 +255,17 @@ def reset( step = 0 try: - # Step 1: Stop all services and remove volumes + # Step 1: Stop all services (and optionally reset database volumes) step += 1 _print(f"\n[bold][{step}/{steps_total}] Stopping services...[/bold]") - _run_cmd(compose_cmd + ["down", "-v", "--remove-orphans"], "Stopping all containers", check=False) - _print("[green]✓[/green] Services stopped") + if db_reset: + # Full reset including database volumes (qdrant, redis, neo4j) + _run_cmd(compose_cmd + ["down", "-v", "--remove-orphans"], "Stopping all containers and removing volumes", check=False) + _print("[green]✓[/green] Services stopped and database volumes removed") + else: + # Stop services but preserve database volumes + _run_cmd(compose_cmd + ["down", "--remove-orphans"], "Stopping all containers", check=False) + _print("[green]✓[/green] Services stopped (database volumes preserved)") # Step 2: Build containers (unless skipped) step += 1 @@ -252,9 +277,11 @@ def reset( else: _print(f"\n[bold][{step}/{steps_total}] Skipping container build[/bold]") - # Step 3: Start Qdrant (and Neo4j if enabled) and wait + # Step 3: Start Qdrant, Redis (if enabled), and Neo4j (if enabled) and wait step += 1 db_services = ["qdrant"] + if redis_enabled: + db_services.append("redis") if neo4j_enabled: db_services.append("neo4j") _print(f"\n[bold][{step}/{steps_total}] Starting {', '.join(db_services)}...[/bold]") @@ -413,25 +440,31 @@ def register_command(subparsers): "This is equivalent to the Makefile's reset-dev targets.", epilog=""" Modes: - dual (default) Both SSE and HTTP MCPs (most comprehensive) - mcp HTTP MCPs only (streamable, Codex compatible) - sse SSE MCPs only (legacy) + (default) HTTP MCPs only (streamable, Codex compatible) + --dual Both SSE and HTTP MCPs (most comprehensive) + --sse SSE MCPs only (legacy) Examples: - ctx reset # Full reset with dual mode - ctx reset --mcp # HTTP MCPs only (streamable) + ctx reset # Full reset with HTTP MCPs (default) + ctx reset --dual # Both SSE and HTTP MCPs ctx reset --sse # SSE MCPs only (legacy) + ctx reset --db-reset # Reset database volumes (Qdrant, Redis, Neo4j) ctx reset --skip-model # Skip llama model download ctx reset --skip-build # Skip container rebuild (faster) """ ) - # Mode selection (mutually exclusive) + # Mode selection (mutually exclusive among modes) mode_group = parser.add_mutually_exclusive_group() mode_group.add_argument( "--mcp", action="store_true", - help="HTTP MCPs only (streamable, Codex compatible)" + help="HTTP MCPs only (streamable, Codex compatible) - this is the default" + ) + mode_group.add_argument( + "--dual", + action="store_true", + help="Both SSE and HTTP MCPs (most comprehensive)" ) mode_group.add_argument( "--sse", @@ -439,6 +472,13 @@ def register_command(subparsers): help="SSE MCPs only (legacy)" ) + # Database reset option (can be combined with any mode) + parser.add_argument( + "--db-reset", + action="store_true", + help="Reset database volumes (Qdrant, Redis, Neo4j)" + ) + # Skip options parser.add_argument( "--skip-build", @@ -476,19 +516,21 @@ def register_command(subparsers): def run_reset(args): """Wrapper to call reset function with argparse args.""" - # Determine mode - if args.mcp: - mode = "mcp" + # Determine mode (default is HTTP-only, no SSE) + if args.dual: + mode = "dual" elif args.sse: mode = "sse" else: - mode = "dual" + # --mcp or no flag = HTTP MCPs only (default) + mode = "mcp" return reset( mode=mode, skip_build=args.skip_build, skip_model=args.skip_model, skip_tokenizer=args.skip_tokenizer, + db_reset=args.db_reset, model_url=args.model_url, model_path=args.model_path, tokenizer_url=args.tokenizer_url, From 3f9ac196e5a138461bc1bb2f4638870f4781092c Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 18:42:25 -0500 Subject: [PATCH 06/14] Add backend migration and improve admin dashboard Implements automatic migration between file and Redis backends for workspace state, triggered on backend switch and controlled via environment variables. Adds backend migration logic to startup of key scripts, updates .env.example with migration options, and enhances the admin dashboard UI for collections, users, and access control with improved layout, tab navigation, and real-time collection status streaming. Also improves Qdrant schema caching and connection pooling, and updates ignore files for new artifacts. --- .env.example | 2 + .gitignore | 4 + .qdrantignore | 31 + docker-compose.yml | 2 +- scripts/indexing_admin.py | 83 +- scripts/ingest/cli.py | 12 +- scripts/mcp_indexer_server.py | 13 +- scripts/upload_service.py | 44 +- scripts/watch_index.py | 9 + scripts/workspace_state.py | 260 +++++++ templates/admin/acl.html | 972 +++++++++++++----------- templates/admin/base.html | 725 +++++++++++++++++- templates/admin/bootstrap.html | 249 +++++- templates/admin/error.html | 23 +- templates/admin/login.html | 238 +++++- tests/test_workspace_state_migration.py | 267 +++++++ 16 files changed, 2382 insertions(+), 552 deletions(-) create mode 100644 tests/test_workspace_state_migration.py diff --git a/.env.example b/.env.example index 562d84b6..30ebc3eb 100644 --- a/.env.example +++ b/.env.example @@ -161,6 +161,7 @@ SEMANTIC_EXPANSION_CACHE_TTL=3600 # Codebase state backend (for .codebase/state.json, .codebase/cache.json, symbols) # Set CODEBASE_STATE_BACKEND=redis to use Redis (recommended for K8s) +# When switching backends, existing state is automatically migrated on startup. # CODEBASE_STATE_BACKEND=file # CODEBASE_STATE_REDIS_ENABLED=0 # CODEBASE_STATE_REDIS_URL=redis://redis:6379/0 @@ -169,6 +170,7 @@ SEMANTIC_EXPANSION_CACHE_TTL=3600 # CODEBASE_STATE_REDIS_LOCK_WAIT_MS=2000 # CODEBASE_STATE_REDIS_SOCKET_TIMEOUT=2 # CODEBASE_STATE_REDIS_CONNECT_TIMEOUT=2 +# CODEBASE_STATE_SKIP_MIGRATION=0 # Set to 1 to disable auto-migration on backend switch # Query Optimization (adaptive HNSW_EF tuning for 2x faster simple queries) QUERY_OPTIMIZER_ADAPTIVE=1 diff --git a/.gitignore b/.gitignore index 9cc39e63..ffcd5f4e 100644 --- a/.gitignore +++ b/.gitignore @@ -65,3 +65,7 @@ ctx_config.json /deploy/eks-cdk /deploy/eks-cdk-PATHFUL .env +.contextstream/config.json +.contextstream/ignore +.cursorrules +GEMINI.md diff --git a/.qdrantignore b/.qdrantignore index dde3a243..e0b4b3c2 100644 --- a/.qdrantignore +++ b/.qdrantignore @@ -4,3 +4,34 @@ # Dev workspace contains uploaded client workspaces - indexed separately /dev-workspace + +# CDK build artifacts (large, auto-generated) +/deploy/eks-cdk*/cdk.out/** +/deploy/eks-cdk*/**/*.js +/deploy/eks-cdk*/**/*.d.ts +/deploy/eks-cdk*/node_modules/** + +# Node modules +/node_modules +**/node_modules + +# Python virtual environments +/.venv* +/venv + +# Model binaries +/models +*.onnx +*.bin +*.safetensors + +# Test fixtures +/test-repos +/.selftest_repo + +# IDE/Editor +/.idea +/.vscode + +# ContextStream local cache +/.contextstream diff --git a/docker-compose.yml b/docker-compose.yml index a3b2cb72..e3513da5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -216,7 +216,7 @@ services: command: ["sh", "-c", "mkdir -p /tmp/huggingface/hub /tmp/huggingface/transformers /tmp/huggingface/fastembed && exec python /app/scripts/learning_reranker_worker.py --daemon"] depends_on: - qdrant - - mcp_indexer + - redis env_file: - .env environment: diff --git a/scripts/indexing_admin.py b/scripts/indexing_admin.py index 1e8abf5f..4b855cda 100644 --- a/scripts/indexing_admin.py +++ b/scripts/indexing_admin.py @@ -154,37 +154,80 @@ def _copy_repo_state_for_clone( logger.warning("[staging] failed to retarget cache.json for %s: %s", clone_repo_name, exc) +# Cache with TTL support _COLLECTION_SCHEMA_CACHE: Dict[str, Dict[str, Any]] = {} +_COLLECTION_SCHEMA_CACHE_TS: Dict[str, float] = {} # Track cache timestamps +_COLLECTION_SCHEMA_CACHE_TTL = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300")) # 5 min default _SNAPSHOT_REFRESHED: Set[str] = set() _MAPPING_INDEX_CACHE: Dict[str, Any] = {"ts": 0.0, "work_dir": "", "value": {}} +# Shared Qdrant client pool for reduced connection overhead +_QDRANT_CLIENT_POOL: Dict[str, "QdrantClient"] = {} +_QDRANT_CLIENT_POOL_LOCK = __import__("threading").Lock() -def _probe_collection_schema(collection: str) -> Optional[Dict[str, Any]]: + +def _get_shared_qdrant_client(url: Optional[str] = None, api_key: Optional[str] = None) -> Optional["QdrantClient"]: + """Get or create a shared Qdrant client from the pool.""" + if QdrantClient is None: + return None + + qdrant_url = url or os.environ.get("QDRANT_URL", "http://qdrant:6333") + qdrant_key = api_key or os.environ.get("QDRANT_API_KEY") or None + pool_key = f"{qdrant_url}|{qdrant_key or ''}" + + with _QDRANT_CLIENT_POOL_LOCK: + if pool_key in _QDRANT_CLIENT_POOL: + return _QDRANT_CLIENT_POOL[pool_key] + + try: + client = QdrantClient( + url=qdrant_url, + api_key=qdrant_key, + timeout=float(os.environ.get("QDRANT_TIMEOUT", "60")), + ) + _QDRANT_CLIENT_POOL[pool_key] = client + return client + except Exception as e: + logger.debug(f"Failed to create shared Qdrant client: {e}") + return None + + +def _is_schema_cache_valid(collection: str) -> bool: + """Check if cached schema is still valid (within TTL).""" + if collection not in _COLLECTION_SCHEMA_CACHE: + return False + cache_ts = _COLLECTION_SCHEMA_CACHE_TS.get(collection, 0.0) + return (time.time() - cache_ts) < _COLLECTION_SCHEMA_CACHE_TTL + + +def _invalidate_schema_cache(collection: Optional[str] = None) -> None: + """Invalidate schema cache for a collection or all collections.""" + if collection: + _COLLECTION_SCHEMA_CACHE.pop(collection, None) + _COLLECTION_SCHEMA_CACHE_TS.pop(collection, None) + else: + _COLLECTION_SCHEMA_CACHE.clear() + _COLLECTION_SCHEMA_CACHE_TS.clear() + + +def _probe_collection_schema(collection: str, force_refresh: bool = False) -> Optional[Dict[str, Any]]: + """Probe collection schema with TTL-aware caching and shared client pool.""" if not collection or QdrantClient is None: return None - cached = _COLLECTION_SCHEMA_CACHE.get(collection) - if cached: - return cached - qdrant_url = os.environ.get("QDRANT_URL", "http://qdrant:6333") - api_key = os.environ.get("QDRANT_API_KEY") or None - if QdrantClient is None: - return + # Check TTL-aware cache first (unless force refresh) + if not force_refresh and _is_schema_cache_valid(collection): + return _COLLECTION_SCHEMA_CACHE.get(collection) - try: - client = QdrantClient(url=qdrant_url, api_key=api_key) - except Exception as e: - logger.debug(f"Failed to connect to Qdrant for schema probe: {e}") + # Use shared client from pool + client = _get_shared_qdrant_client() + if client is None: return None try: info = client.get_collection(collection_name=collection) except Exception as e: logger.debug(f"Failed to get collection info for '{collection}': {e}") - try: - client.close() - except Exception as close_e: - logger.debug(f"Suppressed exception during close: {close_e}") return None try: @@ -231,16 +274,14 @@ def _probe_collection_schema(collection: str) -> Optional[Dict[str, Any]]: "sparse_vectors": sparse_vectors, "payload_indexes": getattr(getattr(info.config, "params", None), "payload_indexes", None), } + # Store in cache with timestamp for TTL _COLLECTION_SCHEMA_CACHE[collection] = schema + _COLLECTION_SCHEMA_CACHE_TS[collection] = time.time() return schema except Exception as e: logger.debug(f"Failed to build schema for collection '{collection}': {e}") return None - finally: - try: - client.close() - except Exception as e: - logger.debug(f"Suppressed exception: {e}") + # Note: Don't close client - it's from the shared pool def _filter_snapshot_only_recreate_keys( diff --git a/scripts/ingest/cli.py b/scripts/ingest/cli.py index 38b4320c..5a034797 100644 --- a/scripts/ingest/cli.py +++ b/scripts/ingest/cli.py @@ -209,7 +209,17 @@ def main(): load_dotenv(Path(__file__).parent.parent.parent / ".env") except ImportError: pass # python-dotenv not installed, rely on exported env vars - + + # Backend migration: detect file<->redis switch and migrate state if needed + try: + from scripts.workspace_state import detect_and_migrate_backend + ws_root = Path(os.environ.get("WORKSPACE_PATH") or os.environ.get("WATCH_ROOT") or "/work") + migrated = detect_and_migrate_backend(ws_root) + if migrated is not None: + print(f"[backend_migration] Migrated {migrated} items to new backend") + except Exception as e: + logger.warning(f"Backend migration check failed (continuing): {e}") + args = parse_args() # Map CLI overrides to env so downstream helpers pick them up diff --git a/scripts/mcp_indexer_server.py b/scripts/mcp_indexer_server.py index 173aafe8..4e36c5c9 100644 --- a/scripts/mcp_indexer_server.py +++ b/scripts/mcp_indexer_server.py @@ -3285,7 +3285,18 @@ async def neo4j_graph_query( _log_level_str = os.environ.get("LOG_LEVEL", "INFO").upper() _log_level = getattr(_logging, _log_level_str, _logging.INFO) _logging.getLogger().setLevel(_log_level) - + + # Backend migration: detect file<->redis switch and migrate state if needed + try: + from scripts.workspace_state import detect_and_migrate_backend + from pathlib import Path as _Path + _ws_root = _Path(os.environ.get("WORKSPACE_PATH") or os.environ.get("WATCH_ROOT") or "/work") + migrated = detect_and_migrate_backend(_ws_root) + if migrated is not None: + logger.info(f"[backend_migration] Migrated {migrated} items to new backend") + except Exception as e: + logger.warning(f"Backend migration check failed (continuing): {e}") + # Startup logging with configuration info logger.info("=" * 60) logger.info("MCP Indexer Server starting...") diff --git a/scripts/upload_service.py b/scripts/upload_service.py index fc8e66fa..3ca6cf56 100644 --- a/scripts/upload_service.py +++ b/scripts/upload_service.py @@ -42,7 +42,7 @@ import uvicorn from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request, status -from fastapi.responses import JSONResponse, RedirectResponse +from fastapi.responses import JSONResponse, RedirectResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from urllib.parse import urlencode @@ -859,6 +859,48 @@ async def admin_collections_status(request: Request): return JSONResponse({"collections": enriched}) +@app.get("/admin/collections/stream") +async def admin_collections_stream(request: Request): + """SSE endpoint for real-time collection status updates.""" + _require_admin_session(request) + + async def event_generator(): + """Generate SSE events for collection status updates.""" + last_data = None + while True: + # Check if client disconnected + if await request.is_disconnected(): + break + + try: + collections = list_collections(include_deleted=False) + enriched = await asyncio.to_thread( + lambda: build_admin_collections_view(collections=collections, work_dir=WORK_DIR) + ) + + # Only send if data changed + current_data = json.dumps(enriched, default=str) + if current_data != last_data: + last_data = current_data + yield f"data: {json.dumps({'type': 'full', 'collections': enriched}, default=str)}\n\n" + + except Exception as e: + yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" + + # Poll interval (2 seconds for SSE) + await asyncio.sleep(2) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + @app.post("/admin/collections/reindex") async def admin_reindex_collection( request: Request, diff --git a/scripts/watch_index.py b/scripts/watch_index.py index 91aad268..759c636a 100644 --- a/scripts/watch_index.py +++ b/scripts/watch_index.py @@ -111,6 +111,15 @@ def main() -> None: _start_health_server() _watcher_started_at = datetime.now(timezone.utc).isoformat() + # Backend migration: detect file<->redis switch and migrate state if needed + try: + from scripts.workspace_state import detect_and_migrate_backend + migrated = detect_and_migrate_backend(ROOT) + if migrated is not None: + print(f"[backend_migration] Migrated {migrated} items to new backend") + except Exception as e: + logger.warning(f"Backend migration check failed (continuing): {e}") + # Resolve collection name from workspace state before any client/state ops try: from scripts.workspace_state import get_collection_name_with_staging as _get_coll diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index ea63506a..70943951 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -54,6 +54,11 @@ def _redis_state_enabled() -> bool: return str(raw).strip().lower() in {"1", "true", "yes", "on"} +def _get_current_backend() -> str: + """Return the current backend name: 'redis' or 'file'.""" + return "redis" if _redis_state_enabled() else "file" + + def _redis_state_active() -> bool: if not _redis_state_enabled(): return False @@ -281,6 +286,261 @@ def _redis_lock(kind: str, path: Path): logger.warning(f"Redis lock release failed for {lock_key}: {e}") +# --------------------------------------------------------------------------- +# Backend Migration: file <-> redis +# --------------------------------------------------------------------------- + +_BACKEND_MARKER_FILENAME = ".backend_marker" +_migration_lock = threading.Lock() +_migration_done = False + + +def _get_backend_marker_path(workspace_root: Optional[Path] = None) -> Path: + """Get path to the backend marker file.""" + if workspace_root is None: + workspace_root = Path(os.environ.get("WORKSPACE_PATH") or os.environ.get("WATCH_ROOT") or "/work") + return workspace_root / ".codebase" / _BACKEND_MARKER_FILENAME + + +def _read_backend_marker(workspace_root: Optional[Path] = None) -> Optional[str]: + """Read the last known backend from the marker file.""" + marker_path = _get_backend_marker_path(workspace_root) + if not marker_path.exists(): + return None + try: + content = marker_path.read_text(encoding="utf-8").strip() + if content in {"redis", "file"}: + return content + except Exception as e: + logger.debug(f"Failed to read backend marker: {e}") + return None + + +def _write_backend_marker(backend: str, workspace_root: Optional[Path] = None) -> None: + """Write the current backend to the marker file.""" + marker_path = _get_backend_marker_path(workspace_root) + try: + marker_path.parent.mkdir(parents=True, exist_ok=True) + marker_path.write_text(backend + "\n", encoding="utf-8") + try: + os.chmod(marker_path, 0o664) + except PermissionError: + pass + except Exception as e: + logger.warning(f"Failed to write backend marker: {e}") + + +def _migrate_file_to_redis(workspace_root: Path) -> int: + """Migrate all state/cache/symbols from files to Redis. + + Returns the count of migrated items. + """ + if not _redis_state_enabled(): + logger.warning("Cannot migrate to Redis: Redis backend not enabled") + return 0 + + client = _get_redis_client() + if client is None: + logger.warning("Cannot migrate to Redis: Redis client not available") + return 0 + + migrated = 0 + codebase_dir = workspace_root / ".codebase" + + if not codebase_dir.exists(): + logger.info(f"No .codebase directory found at {codebase_dir}, nothing to migrate") + return 0 + + logger.info(f"Starting file->Redis migration from {codebase_dir}") + + # Migrate state.json files + for state_file in codebase_dir.rglob("state.json"): + try: + with open(state_file, "r", encoding="utf-8-sig") as f: + state = json.load(f) + if isinstance(state, dict): + # Use the file path as the key basis (same as normal operation) + if _redis_set_json("state", state_file, state): + migrated += 1 + logger.debug(f"Migrated state: {state_file}") + except Exception as e: + logger.warning(f"Failed to migrate state file {state_file}: {e}") + + # Migrate cache.json files + for cache_file in codebase_dir.rglob("cache.json"): + try: + with open(cache_file, "r", encoding="utf-8-sig") as f: + cache = json.load(f) + if isinstance(cache, dict): + if _redis_set_json("cache", cache_file, cache): + migrated += 1 + logger.debug(f"Migrated cache: {cache_file}") + except Exception as e: + logger.warning(f"Failed to migrate cache file {cache_file}: {e}") + + # Migrate symbols/*.json files + for symbols_dir in codebase_dir.rglob("symbols"): + if not symbols_dir.is_dir(): + continue + for symbol_file in symbols_dir.glob("*.json"): + try: + with open(symbol_file, "r", encoding="utf-8-sig") as f: + symbols = json.load(f) + if isinstance(symbols, dict): + if _redis_set_json("symbols", symbol_file, symbols): + migrated += 1 + logger.debug(f"Migrated symbols: {symbol_file}") + except Exception as e: + logger.warning(f"Failed to migrate symbol file {symbol_file}: {e}") + + logger.info(f"File->Redis migration complete: {migrated} items migrated") + return migrated + + +def _migrate_redis_to_file(workspace_root: Path) -> int: + """Migrate all state/cache/symbols from Redis to files. + + Returns the count of migrated items. + """ + # We need to temporarily bypass the Redis check to read from Redis + # even though the current backend is set to 'file' + try: + import redis as redis_pkg + except ImportError: + logger.warning("Cannot migrate from Redis: redis package not available") + return 0 + + url = os.environ.get("CODEBASE_STATE_REDIS_URL") or os.environ.get("REDIS_URL") or "redis://redis:6379/0" + try: + client = redis_pkg.Redis.from_url(url, decode_responses=True, socket_timeout=2.0) + client.ping() + except Exception as e: + logger.warning(f"Cannot migrate from Redis: connection failed: {e}") + return 0 + + migrated = 0 + prefix = _redis_prefix() + + logger.info(f"Starting Redis->file migration to {workspace_root}") + + codebase_dir = workspace_root / ".codebase" + codebase_dir.mkdir(parents=True, exist_ok=True) + + # Scan and migrate each kind + for kind in ("state", "cache", "symbols"): + pattern = f"{prefix}:{kind}:*" + try: + for key in client.scan_iter(match=pattern, count=200): + try: + raw = client.get(key) + if not raw: + continue + obj = json.loads(raw) + if not isinstance(obj, dict): + continue + + # Determine file path from the object's stored metadata or derive it + # For state/cache, we need to reconstruct the path + # The key is a hash of the path, so we need to store path in the object + file_path_str = obj.get("_source_path") or obj.get("workspace_path") + if not file_path_str: + # Fallback: write to default location based on kind + if kind == "state": + file_path = codebase_dir / "state.json" + elif kind == "cache": + file_path = codebase_dir / "cache.json" + else: + # For symbols, use a hash-based name + key_hash = key.split(":")[-1] if ":" in key else hashlib.md5(key.encode()).hexdigest() + symbols_dir = codebase_dir / "symbols" + symbols_dir.mkdir(parents=True, exist_ok=True) + file_path = symbols_dir / f"{key_hash[:16]}.json" + else: + file_path = Path(file_path_str) + # Ensure it's under the workspace + if not str(file_path).startswith(str(workspace_root)): + # Reconstruct relative path under workspace + if kind == "state": + file_path = codebase_dir / "state.json" + elif kind == "cache": + file_path = codebase_dir / "cache.json" + else: + continue + + # Write to file + file_path.parent.mkdir(parents=True, exist_ok=True) + with open(file_path, "w", encoding="utf-8") as f: + json.dump(obj, f, indent=2, ensure_ascii=False) + try: + os.chmod(file_path, 0o664) + except PermissionError: + pass + migrated += 1 + logger.debug(f"Migrated {kind}: {key} -> {file_path}") + except Exception as e: + logger.warning(f"Failed to migrate Redis key {key}: {e}") + except Exception as e: + logger.warning(f"Failed to scan Redis keys for {kind}: {e}") + + logger.info(f"Redis->file migration complete: {migrated} items migrated") + return migrated + + +def detect_and_migrate_backend(workspace_root: Optional[Path] = None) -> Optional[int]: + """Detect backend switch and trigger migration if needed. + + Call this on startup from indexer/watcher/MCP servers. + + Returns: + - None if no migration needed + - int count of migrated items if migration occurred + """ + global _migration_done + + # Skip if migration already done in this process + with _migration_lock: + if _migration_done: + return None + _migration_done = True + + # Skip if explicitly disabled + if os.environ.get("CODEBASE_STATE_SKIP_MIGRATION", "").strip().lower() in {"1", "true", "yes"}: + logger.info("Backend migration skipped (CODEBASE_STATE_SKIP_MIGRATION=1)") + return None + + if workspace_root is None: + workspace_root = Path(os.environ.get("WORKSPACE_PATH") or os.environ.get("WATCH_ROOT") or "/work") + + current_backend = _get_current_backend() + last_backend = _read_backend_marker(workspace_root) + + logger.info(f"Backend check: current={current_backend}, last={last_backend}") + + if last_backend is None: + # First run or marker missing - just set the marker + _write_backend_marker(current_backend, workspace_root) + logger.info(f"Backend marker initialized: {current_backend}") + return None + + if last_backend == current_backend: + # No change + return None + + # Backend has changed - migrate! + logger.info(f"Backend switch detected: {last_backend} -> {current_backend}") + + migrated = 0 + if last_backend == "file" and current_backend == "redis": + migrated = _migrate_file_to_redis(workspace_root) + elif last_backend == "redis" and current_backend == "file": + migrated = _migrate_redis_to_file(workspace_root) + + # Update the marker + _write_backend_marker(current_backend, workspace_root) + + return migrated + + def is_staging_enabled() -> bool: raw = os.environ.get("CTXCE_STAGING_ENABLED", "") v = (raw or "").strip().lower() diff --git a/templates/admin/acl.html b/templates/admin/acl.html index 952a0ce9..11cdda0b 100644 --- a/templates/admin/acl.html +++ b/templates/admin/acl.html @@ -1,474 +1,570 @@ {% extends "admin/base.html" %} {% block content %} -
-

Users

- - - - {% if users and users|length > 0 %} - {% for u in users %} - - - - - - {% endfor %} - {% else %} - - {% endif %} -
idusernamerole
{{ u.id }}{{ u.username }}{{ u.role }}
(none)
- -

Create User

-
-

-

- -

- -
+ -
-

Collections

+ +
+ + + +
+ + +
+
+
+
+

Collections

+

Manage your indexed codebases

+
+
+ {{ collections|length if collections else 0 }} collections +
+
{% if deletion_enabled %} -

- WARNING: Collection deletion is enabled on this server. Deleting a collection can remove server-side files under {{ work_dir }}. - If {{ work_dir }} is a bind mount of your only copy of code, you can lose data. Practical guidance: Standard compose: treat /work as “possibly my real repo”. Keep CTXCE_ADMIN_COLLECTION_DELETE_ENABLED=0 unless you really mean it. -

- {% else %} -

Collection deletion is disabled by server configuration.

+
+
+ + Collection deletion enabled. Deleting may remove files under {{ work_dir }}. +
+
{% endif %} - - - - - - - - - - - - - - - {% if collections and collections|length > 0 %} - {% for c in collections %} - - - - - + + {% endfor %} + {% else %} + + {% endif %} + +
idqdrant_collectionindexing_statusmappingapplied hashpending hashcurrent hashactions
{{ c.id }} -
{{ c.qdrant_collection }}
- {% if c.container_path %} -
{{ c.container_path }}
- {% endif %} - {% if c.repo_name %} -
repo: {{ c.repo_name }}
- {% endif %} -
-
{{ c.indexing_state or "idle" }}
- {% if c.indexing_started_at %} -
started {{ c.indexing_started_at }}
- {% endif %} - {% if c.progress_files_processed is not none %} -
- {{ c.progress_files_processed }} - {% if c.progress_total_files %} - / {{ c.progress_total_files }} - {% endif %} - files -
- {% endif %} - {% if c.progress_current_file %} -
{{ c.progress_current_file }}
- {% endif %} -
- {% if c.has_mapping %} - ok ({{ c.mapping_count }}) - {% else %} - missing - {% endif %} - {% if c.needs_recreate %} -
- maintenance needed — recreate +
+ + + + + + + + + + + + + {% if collections and collections|length > 0 %} + {% for c in collections %} + + + + + + + - - - - - - {% endfor %} - {% else %} - - {% endif %} - -
CollectionStatusProgressConfigHashesActions
+
{{ c.qdrant_collection }}
+ {% if c.container_path %} +
{{ c.container_path }}
+ {% endif %} + {% if c.repo_name %} +
{{ c.repo_name }}
+ {% endif %} +
+ {% if c.indexing_state == 'indexing' %} + Indexing + {% elif c.indexing_state == 'error' %} + Error + {% else %} + Idle + {% endif %} + {% if c.indexing_started_at %} +
{{ c.indexing_started_at }}
+ {% endif %} +
+ {% if c.progress_files_processed is not none and c.progress_total_files %} + {% set pct = ((c.progress_files_processed / c.progress_total_files) * 100)|round|int if c.progress_total_files > 0 else 0 %} +
+
+
+
{{ c.progress_files_processed }} / {{ c.progress_total_files }} files ({{ pct }}%)
+ {% elif c.progress_files_processed is not none %} +
{{ c.progress_files_processed }} files
+ {% else %} + + {% endif %} + {% if c.progress_current_file %} +
{{ c.progress_current_file }}
+ {% endif %} +
+ {% if c.has_mapping %} + OK ({{ c.mapping_count }}) + {% else %} + Missing + {% endif %} + {% if c.needs_recreate %} +
+ Needs Recreate {% if c.drift_recreate_keys %} -
- {% for key in c.drift_recreate_keys %} - {{ key }} - {% endfor %} -
+
{{ c.drift_recreate_keys|join(', ') }}
{% endif %}
- {% elif c.needs_snapshot_refresh %} -
- config applied — refresh snapshot - {% if c.snapshot_only_recreate_keys %} -
- {% for key in c.snapshot_only_recreate_keys %} - {{ key }} - {% endfor %} -
- {% endif %} - {% if c.snapshot_refresh_triggered %} -
env snapshot refreshed automatically
- {% endif %} + {% elif c.needs_snapshot_refresh %} +
+ Refresh Snapshot
- {% elif c.needs_reindex_only %} -
- config drift — reindex - {% if c.drift_reindex_keys %} -
- {% for key in c.drift_reindex_keys %} - {{ key }} - {% endfor %} -
+ {% elif c.needs_reindex_only %} +
+ Needs Reindex +
+ {% endif %} +
+
{{ (c.applied_indexing_hash or '—')[:8] }}
+ {% if c.pending_indexing_hash %} +
{{ c.pending_indexing_hash[:8] }}
+ {% endif %} +
+
+ {% if c.has_mapping %} +
+ + +
+
+ + +
+ {% endif %} + {% if deletion_enabled %} +
+ + +
{% endif %}
- {% endif %} -
{{ c.applied_indexing_hash }} - {% if c.pending_indexing_hash %} - {{ c.pending_indexing_hash }} - {% else %} - - {% endif %} - {{ c.current_indexing_hash }} - {% if c.has_mapping %} -
- - -
-
- - -
- {% endif %} - {% if deletion_enabled %} -
- - - -
- {% endif %} -
(none)
+
No collections found
+
+
+ + - + // Keyboard shortcuts + document.addEventListener('keydown', (e) => { + // Ctrl/Cmd + R to refresh (prevent default) + if ((e.ctrlKey || e.metaKey) && e.key === 'r') { + e.preventDefault(); + refreshAll(); + } + }); +})(); + {% endblock %} diff --git a/templates/admin/base.html b/templates/admin/base.html index acddc334..6597bd93 100644 --- a/templates/admin/base.html +++ b/templates/admin/base.html @@ -1,49 +1,708 @@ - + {{ title }} -
-

{{ title }}

- +
- {% if flash and flash.message %} -
{{ flash.message }}
- {% endif %} +
+ {% block content %}{% endblock %} +
+ +
CTXCE Admin UI
+ + +
+ + + + + diff --git a/templates/admin/bootstrap.html b/templates/admin/bootstrap.html index 34b05382..231aca3c 100644 --- a/templates/admin/bootstrap.html +++ b/templates/admin/bootstrap.html @@ -1,27 +1,222 @@ -{% extends "admin/base.html" %} - -{% block content %} -
-

Bootstrap Admin User

- - {% if error %} -

{{ error }}

- {% endif %} - -
- -

- -

- -
- -

Only available when no users exist yet.

-
-{% endblock %} + + + + + + {{ title }} + + + +
+
+ +

Bootstrap Admin

+

Create your first admin account

+
+ +
+
+ + + + + No users exist yet. Create the first admin account. +
+ + {% if error %} +
+ + + + + + {{ error }} +
+ {% endif %} + +
+
+ + +
+
+ + +
+ +
+
+ + +
+ + diff --git a/templates/admin/error.html b/templates/admin/error.html index 850f623b..024349fb 100644 --- a/templates/admin/error.html +++ b/templates/admin/error.html @@ -1,9 +1,24 @@ {% extends "admin/base.html" %} {% block content %} -
-

{{ title }}

-

{{ message }}

-

Back

+
+
+ + + + +
+ +

{{ title }}

+

{{ message }}

+ + + + + + + Go Back + +
{% endblock %} diff --git a/templates/admin/login.html b/templates/admin/login.html index 1b94be36..3451e533 100644 --- a/templates/admin/login.html +++ b/templates/admin/login.html @@ -1,25 +1,213 @@ -{% extends "admin/base.html" %} - -{% block content %} -
-

Login

- - {% if error %} -

{{ error }}

- {% endif %} - -
- -

- -

- -
-
-{% endblock %} + + + + + + {{ title }} + + + + + + diff --git a/tests/test_workspace_state_migration.py b/tests/test_workspace_state_migration.py new file mode 100644 index 00000000..97c0ba0b --- /dev/null +++ b/tests/test_workspace_state_migration.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +""" +Tests for backend migration in workspace_state.py. + +Tests cover: +- Backend detection (_get_current_backend) +- Marker file operations (_read_backend_marker, _write_backend_marker) +- File to Redis migration (_migrate_file_to_redis) +- Redis to file migration (_migrate_redis_to_file) +- Migration detection and orchestration (detect_and_migrate_backend) +- Skip migration flag +- Idempotency and edge cases +""" +import importlib +import json +import os +import threading +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +pytestmark = pytest.mark.unit + + +# ============================================================================ +# Fixture: Isolated workspace_state import +# ============================================================================ +@pytest.fixture +def ws_module(monkeypatch, tmp_path): + """ + Import workspace_state with isolated environment and temp workspace. + Returns the reloaded module. + """ + ws_root = tmp_path / "work" + ws_root.mkdir(parents=True, exist_ok=True) + codebase_dir = ws_root / ".codebase" + codebase_dir.mkdir(parents=True, exist_ok=True) + + monkeypatch.setenv("WORKSPACE_PATH", str(ws_root)) + monkeypatch.setenv("WATCH_ROOT", str(ws_root)) + monkeypatch.delenv("MULTI_REPO_MODE", raising=False) + monkeypatch.delenv("CODEBASE_STATE_BACKEND", raising=False) + monkeypatch.delenv("CODEBASE_STATE_REDIS_ENABLED", raising=False) + monkeypatch.delenv("CODEBASE_STATE_SKIP_MIGRATION", raising=False) + + ws = importlib.import_module("scripts.workspace_state") + ws = importlib.reload(ws) + + # Reset migration flag for each test + ws._migration_done = False + + return ws + + +@pytest.fixture +def ws_root(tmp_path): + """Create a temp workspace root.""" + ws_root = tmp_path / "work" + ws_root.mkdir(parents=True, exist_ok=True) + return ws_root + + +# ============================================================================ +# Tests: Backend Detection +# ============================================================================ +class TestGetCurrentBackend: + """Tests for _get_current_backend function.""" + + def test_default_is_file(self, ws_module, monkeypatch): + """Default backend is 'file' when no env vars set.""" + monkeypatch.delenv("CODEBASE_STATE_BACKEND", raising=False) + monkeypatch.delenv("CODEBASE_STATE_REDIS_ENABLED", raising=False) + ws = importlib.reload(ws_module) + assert ws._get_current_backend() == "file" + + def test_redis_backend_explicit(self, ws_module, monkeypatch): + """Backend is 'redis' when explicitly set.""" + monkeypatch.setenv("CODEBASE_STATE_BACKEND", "redis") + ws = importlib.reload(ws_module) + assert ws._get_current_backend() == "redis" + + def test_redis_enabled_flag(self, ws_module, monkeypatch): + """Backend is 'redis' when REDIS_ENABLED=1.""" + monkeypatch.delenv("CODEBASE_STATE_BACKEND", raising=False) + monkeypatch.setenv("CODEBASE_STATE_REDIS_ENABLED", "1") + ws = importlib.reload(ws_module) + assert ws._get_current_backend() == "redis" + + def test_file_backend_explicit(self, ws_module, monkeypatch): + """Backend is 'file' when explicitly set.""" + monkeypatch.setenv("CODEBASE_STATE_BACKEND", "file") + ws = importlib.reload(ws_module) + assert ws._get_current_backend() == "file" + + def test_filesystem_backend_alias(self, ws_module, monkeypatch): + """Backend 'filesystem' is treated as 'file'.""" + monkeypatch.setenv("CODEBASE_STATE_BACKEND", "filesystem") + ws = importlib.reload(ws_module) + assert ws._get_current_backend() == "file" + + +# ============================================================================ +# Tests: Marker File Operations +# ============================================================================ +class TestBackendMarker: + """Tests for backend marker file operations.""" + + def test_get_marker_path(self, ws_module, ws_root): + """Marker path is correct.""" + marker_path = ws_module._get_backend_marker_path(ws_root) + expected = ws_root / ".codebase" / ".backend_marker" + assert marker_path == expected + + def test_read_marker_missing(self, ws_module, ws_root): + """Reading missing marker returns None.""" + result = ws_module._read_backend_marker(ws_root) + assert result is None + + def test_write_and_read_marker(self, ws_module, ws_root): + """Can write and read marker.""" + (ws_root / ".codebase").mkdir(parents=True, exist_ok=True) + ws_module._write_backend_marker("redis", ws_root) + result = ws_module._read_backend_marker(ws_root) + assert result == "redis" + + def test_write_and_read_file_marker(self, ws_module, ws_root): + """Can write and read 'file' marker.""" + (ws_root / ".codebase").mkdir(parents=True, exist_ok=True) + ws_module._write_backend_marker("file", ws_root) + result = ws_module._read_backend_marker(ws_root) + assert result == "file" + + def test_invalid_marker_content(self, ws_module, ws_root): + """Invalid marker content returns None.""" + codebase_dir = ws_root / ".codebase" + codebase_dir.mkdir(parents=True, exist_ok=True) + marker_path = codebase_dir / ".backend_marker" + marker_path.write_text("invalid_backend\n") + result = ws_module._read_backend_marker(ws_root) + assert result is None + + def test_marker_creates_directory(self, ws_module, tmp_path): + """Marker write creates .codebase directory if missing.""" + ws_root = tmp_path / "new_workspace" + ws_root.mkdir() + ws_module._write_backend_marker("redis", ws_root) + marker_path = ws_root / ".codebase" / ".backend_marker" + assert marker_path.exists() + assert marker_path.read_text().strip() == "redis" + + +# ============================================================================ +# Tests: detect_and_migrate_backend +# ============================================================================ +class TestDetectAndMigrateBackend: + """Tests for detect_and_migrate_backend orchestration.""" + + def test_first_run_no_migration(self, ws_module, ws_root, monkeypatch): + """First run (no marker) initializes marker, no migration.""" + monkeypatch.setenv("CODEBASE_STATE_BACKEND", "file") + ws = importlib.reload(ws_module) + ws._migration_done = False + + result = ws.detect_and_migrate_backend(ws_root) + + assert result is None # No migration occurred + # Marker should be created + marker = ws._read_backend_marker(ws_root) + assert marker == "file" + + def test_same_backend_no_migration(self, ws_module, ws_root, monkeypatch): + """Same backend (no switch) returns None.""" + monkeypatch.setenv("CODEBASE_STATE_BACKEND", "file") + ws = importlib.reload(ws_module) + ws._migration_done = False + + # Set marker to current backend + ws._write_backend_marker("file", ws_root) + + result = ws.detect_and_migrate_backend(ws_root) + + assert result is None # No migration needed + + def test_skip_migration_flag(self, ws_module, ws_root, monkeypatch): + """CODEBASE_STATE_SKIP_MIGRATION=1 skips migration.""" + monkeypatch.setenv("CODEBASE_STATE_SKIP_MIGRATION", "1") + ws = importlib.reload(ws_module) + ws._migration_done = False + + # Set marker to different backend to trigger migration + ws._write_backend_marker("redis", ws_root) + + result = ws.detect_and_migrate_backend(ws_root) + + assert result is None # Migration skipped + + def test_idempotency_same_process(self, ws_module, ws_root, monkeypatch): + """Migration only runs once per process.""" + monkeypatch.setenv("CODEBASE_STATE_BACKEND", "file") + ws = importlib.reload(ws_module) + + # First call + ws._migration_done = False + result1 = ws.detect_and_migrate_backend(ws_root) + + # Second call should skip (migration_done = True) + result2 = ws.detect_and_migrate_backend(ws_root) + + assert result2 is None # Skipped due to flag + + def test_file_to_redis_switch_detected(self, ws_module, ws_root, monkeypatch): + """File to Redis switch is detected and triggers migration.""" + # Start with file backend marker + codebase_dir = ws_root / ".codebase" + codebase_dir.mkdir(parents=True, exist_ok=True) + ws_module._write_backend_marker("file", ws_root) + + # Create a state.json file to migrate + state_file = codebase_dir / "state.json" + state_file.write_text(json.dumps({"collection": "test-coll"})) + + # Switch to Redis backend + monkeypatch.setenv("CODEBASE_STATE_BACKEND", "redis") + monkeypatch.setenv("CODEBASE_STATE_REDIS_URL", "redis://localhost:6379/0") + ws = importlib.reload(ws_module) + ws._migration_done = False + + # Mock Redis client + mock_client = MagicMock() + mock_client.ping.return_value = True + mock_client.set.return_value = True + + with patch.object(ws, "_get_redis_client", return_value=mock_client): + result = ws.detect_and_migrate_backend(ws_root) + + # Migration should occur (returns count of migrated items) + assert result is not None + assert result >= 1 # At least state.json migrated + + def test_redis_to_file_switch_detected(self, ws_module, ws_root, monkeypatch): + """Redis to file switch is detected and triggers migration.""" + # Start with redis backend marker + codebase_dir = ws_root / ".codebase" + codebase_dir.mkdir(parents=True, exist_ok=True) + ws_module._write_backend_marker("redis", ws_root) + + # Switch to file backend + monkeypatch.setenv("CODEBASE_STATE_BACKEND", "file") + ws = importlib.reload(ws_module) + ws._migration_done = False + + # Mock Redis with some data + mock_client = MagicMock() + mock_client.ping.return_value = True + mock_client.scan_iter.return_value = iter([ + "context-engine:codebase:state:abc123" + ]) + mock_client.get.return_value = json.dumps({"collection": "test-coll"}) + + with patch("redis.Redis.from_url", return_value=mock_client): + result = ws.detect_and_migrate_backend(ws_root) + + # Migration should occur + assert result is not None + assert result >= 1 + From 7e2890c68b9c6dabf28ec77ac65cfef7338a4d81 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 19:01:11 -0500 Subject: [PATCH 07/14] Switch from hashlib to xxhash for faster hashing Replaced hashlib usage with xxhash in pipeline, qdrant, and workspace_state scripts to improve hashing performance. Also updated upload_service to allow admin access and dashboard redirects when authentication is disabled, enhancing demo and development mode usability. --- scripts/ingest/pipeline.py | 7 ++++--- scripts/ingest/qdrant.py | 5 +++-- scripts/upload_service.py | 17 +++++++++++------ scripts/workspace_state.py | 12 ++++++------ 4 files changed, 24 insertions(+), 17 deletions(-) diff --git a/scripts/ingest/pipeline.py b/scripts/ingest/pipeline.py index 405dc30b..c6be18c4 100644 --- a/scripts/ingest/pipeline.py +++ b/scripts/ingest/pipeline.py @@ -10,8 +10,9 @@ import logging import os import sys -import hashlib import time + +import xxhash import multiprocessing from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path @@ -702,7 +703,7 @@ def _index_single_file_inner( language = detect_language(file_path) is_text_like = _is_text_like_language(language) - file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() + file_hash = xxhash.xxh64(text.encode("utf-8", errors="ignore")).hexdigest() repo_tag = repo_name_for_cache or _detect_repo_name_from_path(file_path) @@ -1590,7 +1591,7 @@ def process_file_with_smart_reindexing( except Exception: file_path = Path(fp) - file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() + file_hash = xxhash.xxh64(text.encode("utf-8", errors="ignore")).hexdigest() # FAST PATH: Check if file hash is unchanged - skip entire processing if so # This avoids AST parsing for unchanged files (P1 optimization) diff --git a/scripts/ingest/qdrant.py b/scripts/ingest/qdrant.py index 26ed9038..8a899216 100644 --- a/scripts/ingest/qdrant.py +++ b/scripts/ingest/qdrant.py @@ -10,7 +10,8 @@ import logging import os import time -import hashlib + +import xxhash from pathlib import Path from typing import List, Dict, Any, Optional @@ -971,7 +972,7 @@ def flush_upserts(client: QdrantClient, collection: str) -> None: def hash_id(text: str, path: str, start: int, end: int) -> int: """Generate a stable hash ID for a chunk.""" - h = hashlib.sha1( + h = xxhash.xxh64( f"{path}:{start}-{end}\n{text}".encode("utf-8", errors="ignore") ).hexdigest() return int(h[:16], 16) diff --git a/scripts/upload_service.py b/scripts/upload_service.py index 3ca6cf56..7b0c4604 100644 --- a/scripts/upload_service.py +++ b/scripts/upload_service.py @@ -379,7 +379,8 @@ def _get_valid_session_record(request: Request) -> Optional[Dict[str, Any]]: def _require_admin_session(request: Request) -> Dict[str, Any]: if not AUTH_ENABLED: - raise HTTPException(status_code=404, detail="Auth disabled") + # Allow access when auth is disabled (demo/dev mode) + return {"user_id": "demo", "role": "admin"} record = _get_valid_session_record(request) if record is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated") @@ -648,11 +649,12 @@ async def auth_validate(payload: AuthValidateRequest): @app.get("/admin") async def admin_root(request: Request): if not AUTH_ENABLED: - raise HTTPException(status_code=404, detail="Auth disabled") + # Auth disabled - go directly to dashboard (demo mode) + return RedirectResponse(url="/admin/acl", status_code=302) try: users_exist = has_any_users() except AuthDisabledError: - raise HTTPException(status_code=404, detail="Auth disabled") + return RedirectResponse(url="/admin/acl", status_code=302) except Exception as e: logger.error(f"[upload_service] Failed to inspect user state for admin UI: {e}") raise HTTPException(status_code=500, detail="Failed to inspect user state") @@ -782,11 +784,14 @@ async def admin_logout(): async def admin_acl_page(request: Request): _require_admin_session(request) try: - users = list_users() + users = list_users() if AUTH_ENABLED else [] collections = list_collections(include_deleted=False) - grants = list_collection_acl() + grants = list_collection_acl() if AUTH_ENABLED else [] except AuthDisabledError: - raise HTTPException(status_code=404, detail="Auth disabled") + # Auth disabled - still show collections, just no users/grants + users = [] + collections = list_collections(include_deleted=False) + grants = [] except Exception as e: logger.error(f"[upload_service] Failed to load admin UI data: {e}") raise HTTPException(status_code=500, detail="Failed to load admin data") diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index 70943951..f7269486 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -19,6 +19,7 @@ import uuid import subprocess import hashlib +import xxhash from datetime import datetime from pathlib import Path from typing import Dict, Any, Optional, List, Literal, TypedDict @@ -71,7 +72,7 @@ def _redis_prefix() -> str: def _redis_key_for_path(kind: str, path: Path) -> str: raw = str(path) - digest = hashlib.md5(raw.encode("utf-8")).hexdigest() + digest = xxhash.xxh64(raw.encode("utf-8")).hexdigest() return f"{_redis_prefix()}:{kind}:{digest}" @@ -270,7 +271,7 @@ def _redis_lock(kind: str, path: Path): logger.info(f"Redis lock not acquired for {lock_key} after {attempts} attempts, proceeding without lock") yield return - logger.info(f"Redis lock acquired for {lock_key} (attempts={attempts}, ttl={ttl_ms}ms)") + logger.debug(f"Redis lock acquired for {lock_key} (attempts={attempts}, ttl={ttl_ms}ms)") try: yield finally: @@ -451,7 +452,7 @@ def _migrate_redis_to_file(workspace_root: Path) -> int: file_path = codebase_dir / "cache.json" else: # For symbols, use a hash-based name - key_hash = key.split(":")[-1] if ":" in key else hashlib.md5(key.encode()).hexdigest() + key_hash = key.split(":")[-1] if ":" in key else xxhash.xxh64(key.encode()).hexdigest() symbols_dir = codebase_dir / "symbols" symbols_dir.mkdir(parents=True, exist_ok=True) file_path = symbols_dir / f"{key_hash[:16]}.json" @@ -1001,8 +1002,7 @@ def _cross_process_lock(lock_path: Path): def _get_file_lock_path(file_path: str) -> Path: """Get the lock file path for a given file.""" # Use hash of file path to avoid filesystem path issues - import hashlib - path_hash = hashlib.md5(file_path.encode()).hexdigest()[:16] + path_hash = xxhash.xxh64(file_path.encode()).hexdigest()[:16] return _FILE_LOCKS_DIR / f"{path_hash}.lock" @@ -2764,7 +2764,7 @@ def _get_symbol_cache_path(file_path: str) -> Path: try: fp = _normalize_cache_key_path(file_path) # Create symbol cache using file hash to handle renames - file_hash = hashlib.md5(fp.encode('utf-8')).hexdigest()[:8] + file_hash = xxhash.xxh64(fp.encode('utf-8')).hexdigest()[:8] if is_multi_repo_mode(): repo_name = _detect_repo_name_from_path(Path(file_path)) if repo_name: From 243beb6032ae85a36790cdf05a045148116c498e Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 19:32:52 -0500 Subject: [PATCH 08/14] Enhance admin UI styles and Qdrant collection listing Revamps admin HTML templates with premium glassmorphism, animated backgrounds, and Inter font for improved aesthetics and UX. Adds list_qdrant_collections to indexing_admin.py and updates upload_service.py to use it for collection listing when auth is disabled, ensuring admin ACL page works without authentication. --- scripts/indexing_admin.py | 18 ++ scripts/upload_service.py | 17 +- templates/admin/base.html | 561 ++++++++++++++++++++++++--------- templates/admin/bootstrap.html | 137 +++++--- templates/admin/login.html | 130 +++++--- 5 files changed, 622 insertions(+), 241 deletions(-) diff --git a/scripts/indexing_admin.py b/scripts/indexing_admin.py index 4b855cda..df9f95eb 100644 --- a/scripts/indexing_admin.py +++ b/scripts/indexing_admin.py @@ -210,6 +210,24 @@ def _invalidate_schema_cache(collection: Optional[str] = None) -> None: _COLLECTION_SCHEMA_CACHE_TS.clear() +def list_qdrant_collections() -> List[Dict[str, Any]]: + """List all collections directly from Qdrant (no auth required). + + Returns a list of dicts with 'qdrant_collection' key for compatibility + with build_admin_collections_view. + """ + client = _get_shared_qdrant_client() + if client is None: + return [] + try: + collections_response = client.get_collections() + collections = getattr(collections_response, "collections", []) or [] + return [{"qdrant_collection": c.name} for c in collections if hasattr(c, "name")] + except Exception as e: + logger.debug(f"Failed to list Qdrant collections: {e}") + return [] + + def _probe_collection_schema(collection: str, force_refresh: bool = False) -> Optional[Dict[str, Any]]: """Probe collection schema with TTL-aware caching and shared client pool.""" if not collection or QdrantClient is None: diff --git a/scripts/upload_service.py b/scripts/upload_service.py index 7b0c4604..3186d8dd 100644 --- a/scripts/upload_service.py +++ b/scripts/upload_service.py @@ -55,6 +55,7 @@ from scripts.indexing_admin import ( build_admin_collections_view, + list_qdrant_collections, resolve_collection_root, spawn_ingest_code, recreate_collection_qdrant, @@ -784,13 +785,19 @@ async def admin_logout(): async def admin_acl_page(request: Request): _require_admin_session(request) try: - users = list_users() if AUTH_ENABLED else [] - collections = list_collections(include_deleted=False) - grants = list_collection_acl() if AUTH_ENABLED else [] + if AUTH_ENABLED: + users = list_users() + collections = list_collections(include_deleted=False) + grants = list_collection_acl() + else: + # Auth disabled - get collections directly from Qdrant + users = [] + collections = list_qdrant_collections() + grants = [] except AuthDisabledError: - # Auth disabled - still show collections, just no users/grants + # Fallback: get collections directly from Qdrant users = [] - collections = list_collections(include_deleted=False) + collections = list_qdrant_collections() grants = [] except Exception as e: logger.error(f"[upload_service] Failed to load admin UI data: {e}") diff --git a/templates/admin/base.html b/templates/admin/base.html index 6597bd93..2102ce76 100644 --- a/templates/admin/base.html +++ b/templates/admin/base.html @@ -4,58 +4,127 @@ {{ title }} + + + diff --git a/templates/admin/bootstrap.html b/templates/admin/bootstrap.html index 231aca3c..cbbad49b 100644 --- a/templates/admin/bootstrap.html +++ b/templates/admin/bootstrap.html @@ -4,33 +4,38 @@ {{ title }} + + + diff --git a/templates/admin/login.html b/templates/admin/login.html index 3451e533..f8be9eaa 100644 --- a/templates/admin/login.html +++ b/templates/admin/login.html @@ -4,32 +4,35 @@ {{ title }} + + + From 50e459dc7449317f3c0f527b478d31246514ec9a Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 20:02:48 -0500 Subject: [PATCH 09/14] Add skip-and-retry logic for Redis lock contention Introduces LockContendedException and enable_lock_skip_on_contention context manager to allow parallel file indexing to skip files with contended Redis locks and retry them sequentially. Improves pipeline.py to handle lock contention gracefully during parallel processing, reducing bottlenecks and improving throughput. --- scripts/ingest/pipeline.py | 71 ++++++++++++++++++++++------ scripts/workspace_state.py | 97 +++++++++++++++++++++++++++++++++----- 2 files changed, 143 insertions(+), 25 deletions(-) diff --git a/scripts/ingest/pipeline.py b/scripts/ingest/pipeline.py index c6be18c4..5e5aac36 100644 --- a/scripts/ingest/pipeline.py +++ b/scripts/ingest/pipeline.py @@ -15,11 +15,24 @@ import xxhash import multiprocessing from concurrent.futures import ThreadPoolExecutor, as_completed +from contextlib import contextmanager from pathlib import Path from typing import List, Dict, Any, Optional, TYPE_CHECKING logger = logging.getLogger(__name__) +# Import lock contention handling for skip-and-retry logic during parallel indexing +try: + from scripts.workspace_state import LockContendedException, enable_lock_skip_on_contention +except ImportError: + # Fallback if not available + class LockContendedException(Exception): + pass + + @contextmanager + def enable_lock_skip_on_contention(): + yield + from qdrant_client import QdrantClient, models from scripts.ingest.config import ( @@ -1479,8 +1492,12 @@ def index_repo( # Cap at reasonable max to avoid overwhelming Qdrant max_workers = min(index_workers, 16) if index_workers > 1 else 1 - def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception]]: - """Task for parallel file indexing. Returns (path, error_or_none).""" + def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: + """Task for parallel file indexing. + + Returns: + (path, error_or_none, was_skipped_due_to_lock) + """ per_file_repo = ( root_repo_for_cache if root_repo_for_cache is not None @@ -1499,32 +1516,58 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception]]: allowed_vectors=allowed_vectors, allowed_sparse=allowed_sparse, ) - return (file_path, None) + return (file_path, None, False) + except LockContendedException: + # Lock contention - mark for retry + return (file_path, None, True) except Exception as e: - return (file_path, e) + return (file_path, e, False) files_processed = 0 errors = [] + retry_queue: list[Path] = [] if max_workers > 1: - # Parallel processing with ThreadPoolExecutor + # Parallel processing with ThreadPoolExecutor and skip-and-retry for lock contention if log_progress: print(f"[index] Using {max_workers} parallel workers") - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = {executor.submit(_index_file_task, fp): fp for fp in files} - for future in as_completed(futures): - files_processed += 1 - file_path, error = future.result() + + # Enable skip-on-contention mode for parallel phase + # This makes Redis locks fail fast instead of blocking, allowing other files to proceed + with enable_lock_skip_on_contention(): + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(_index_file_task, fp): fp for fp in files} + for future in as_completed(futures): + files_processed += 1 + file_path, error, was_skipped = future.result() + if was_skipped: + # Lock contention - queue for retry + retry_queue.append(file_path) + elif error: + errors.append((file_path, error)) + print(f"Error indexing {file_path}: {error}") + if log_progress and (files_processed % 25 == 0 or files_processed == total_files): + print(f"[index] {files_processed}/{total_files} files processed") + + # Retry phase: process skipped files sequentially (no lock contention mode) + # This runs OUTSIDE the enable_lock_skip_on_contention context, so locks wait normally + if retry_queue: + if log_progress: + print(f"[index] Retrying {len(retry_queue)} files skipped due to lock contention") + for file_path in retry_queue: + _, error, was_skipped = _index_file_task(file_path) if error: errors.append((file_path, error)) print(f"Error indexing {file_path}: {error}") - if log_progress and (files_processed % 25 == 0 or files_processed == total_files): - print(f"[index] {files_processed}/{total_files} files processed") + # If still skipped on retry, that's a real issue - count as error + if was_skipped: + errors.append((file_path, LockContendedException("Lock still contended after retry"))) + logger.warning(f"Lock still contended after retry for {file_path}") else: - # Sequential processing (original behavior) + # Sequential processing (original behavior) - no retry needed for file_path in iterator: files_processed += 1 - file_path, error = _index_file_task(file_path) + file_path, error, _ = _index_file_task(file_path) if error: errors.append((file_path, error)) print(f"Error indexing {file_path}: {error}") diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index f7269486..55f2fa5c 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -42,6 +42,12 @@ _REDIS_CLIENT = None _REDIS_CLIENT_LOCK = threading.Lock() +# Global flag to enable skip-if-contended behavior for Redis locks. +# When set, locks that are contended will raise LockContendedException +# instead of waiting, allowing the caller to retry later. +# Uses threading.Event for thread-safe cross-thread visibility in ThreadPoolExecutor. +_lock_skip_contended_event = threading.Event() + def _redis_state_enabled() -> bool: backend = str(os.environ.get("CODEBASE_STATE_BACKEND", "") or "").strip().lower() @@ -238,8 +244,62 @@ def _redis_delete(kind: str, path: Path) -> bool: return False +class LockContendedException(Exception): + """Raised when a Redis lock cannot be acquired and skip_if_contended=True.""" + pass + + @contextmanager -def _redis_lock(kind: str, path: Path): +def enable_lock_skip_on_contention(): + """Context manager to enable skip-if-contended behavior for Redis locks. + + When active, any Redis lock contention will raise LockContendedException + instead of waiting, allowing the caller to retry the operation later. + This is useful for parallel processing where one file's lock shouldn't + block processing of other files. + + This uses a threading.Event for cross-thread visibility, so worker threads + in a ThreadPoolExecutor will see the flag. + + Example: + with enable_lock_skip_on_contention(): + with ThreadPoolExecutor() as executor: + # Worker threads will see the skip flag + futures = [executor.submit(index_file, f) for f in files] + """ + _lock_skip_contended_event.set() + try: + yield + finally: + _lock_skip_contended_event.clear() + + +def _should_skip_if_contended() -> bool: + """Check if skip-if-contended mode is enabled (thread-safe, cross-thread visible).""" + return _lock_skip_contended_event.is_set() + + +@contextmanager +def _redis_lock(kind: str, path: Path, *, skip_if_contended: bool = False): + """Acquire a Redis distributed lock for the given resource. + + Args: + kind: Type of resource ("state", "cache", "symbols") + path: Path to the resource being locked + skip_if_contended: If True, raise LockContendedException after a few quick + attempts instead of waiting the full timeout. Useful for parallel processing + where the caller can retry later. Also enabled if the thread-local + enable_lock_skip_on_contention() context manager is active. + + Yields: + None when lock is acquired (or Redis not available) + + Raises: + LockContendedException: When skip_if_contended=True and lock is contended + """ + # Check thread-local flag as well as explicit parameter + skip_mode = skip_if_contended or _should_skip_if_contended() + client = _get_redis_client() if client is None: yield @@ -250,14 +310,23 @@ def _redis_lock(kind: str, path: Path): ttl_ms = int(os.environ.get("CODEBASE_STATE_REDIS_LOCK_TTL_MS", "5000") or 5000) except Exception: ttl_ms = 5000 - try: - wait_ms = int(os.environ.get("CODEBASE_STATE_REDIS_LOCK_WAIT_MS", "2000") or 2000) - except Exception: - wait_ms = 2000 - deadline = time.time() + (wait_ms / 1000.0) + + # For skip_if_contended mode, try just a few times quickly (100-200ms max) + # For normal mode, wait up to REDIS_LOCK_WAIT_MS (default 2000ms) + if skip_mode: + max_attempts = 4 + sleep_interval = 0.05 # 50ms between attempts = 200ms max + else: + try: + wait_ms = int(os.environ.get("CODEBASE_STATE_REDIS_LOCK_WAIT_MS", "2000") or 2000) + except Exception: + wait_ms = 2000 + max_attempts = int(wait_ms / 50) + 1 # 50ms per attempt + sleep_interval = 0.05 + acquired = False attempts = 0 - while time.time() < deadline: + for _ in range(max_attempts): attempts += 1 try: if client.set(lock_key, token, nx=True, px=ttl_ms): @@ -266,11 +335,17 @@ def _redis_lock(kind: str, path: Path): except Exception as e: logger.warning(f"Redis lock set failed for {lock_key}: {e}") break - time.sleep(0.05) + time.sleep(sleep_interval) + if not acquired: - logger.info(f"Redis lock not acquired for {lock_key} after {attempts} attempts, proceeding without lock") - yield - return + if skip_mode: + logger.debug(f"Redis lock contended for {lock_key}, skipping (attempts={attempts})") + raise LockContendedException(f"Lock contended: {lock_key}") + else: + logger.info(f"Redis lock not acquired for {lock_key} after {attempts} attempts, proceeding without lock") + yield + return + logger.debug(f"Redis lock acquired for {lock_key} (attempts={attempts}, ttl={ttl_ms}ms)") try: yield From 7ce1a543c0aa0c5f9825b9b3c4601d95f48709d6 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 20:15:25 -0500 Subject: [PATCH 10/14] Add live indexing progress and improve admin UI demo mode Implements workspace state updates for live indexing progress in pipeline.py, enabling UI updates. Enhances admin login and dashboard behavior for demo mode, including support for demo sessions and clearer credential hints. Updates ACL table to show collection IDs and indexing hashes for improved visibility. --- scripts/ingest/pipeline.py | 32 ++++++++++++++++++++++++++ scripts/upload_service.py | 40 ++++++++++++++++++++++++++++----- templates/admin/acl.html | 46 +++++++++++++++++++++++++++++++++----- templates/admin/login.html | 3 +++ 4 files changed, 111 insertions(+), 10 deletions(-) diff --git a/scripts/ingest/pipeline.py b/scripts/ingest/pipeline.py index 5e5aac36..0fe9e893 100644 --- a/scripts/ingest/pipeline.py +++ b/scripts/ingest/pipeline.py @@ -65,6 +65,8 @@ def enable_lock_skip_on_contention(): get_workspace_state, compare_symbol_changes, file_indexing_lock, + set_indexing_started, + set_indexing_progress, ) from scripts.ingest.exclusions import ( iter_files, @@ -1479,6 +1481,14 @@ def index_repo( if log_progress: print(f"[index] Found {total_files} files to process under {root}") + # Track progress in workspace state for live UI updates + workspace_path = str(root) + started_at = time.strftime("%Y-%m-%dT%H:%M:%S") + try: + set_indexing_started(workspace_path, total_files) + except Exception as e: + logger.debug(f"Failed to set indexing started: {e}") + # Parallel file processing configuration # INDEX_WORKERS=0 or 1 means sequential (default for safety) # INDEX_WORKERS=N uses N threads (recommended: 4-8 for I/O-bound indexing) @@ -1537,6 +1547,7 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: with enable_lock_skip_on_contention(): with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(_index_file_task, fp): fp for fp in files} + last_progress_update = 0 for future in as_completed(futures): files_processed += 1 file_path, error, was_skipped = future.result() @@ -1548,6 +1559,16 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: print(f"Error indexing {file_path}: {error}") if log_progress and (files_processed % 25 == 0 or files_processed == total_files): print(f"[index] {files_processed}/{total_files} files processed") + # Update progress in workspace state every 10 files for live UI + if files_processed - last_progress_update >= 10 or files_processed == total_files: + last_progress_update = files_processed + try: + set_indexing_progress( + workspace_path, started_at, files_processed, total_files, + str(file_path) if file_path else None + ) + except Exception as e: + logger.debug(f"Failed to update indexing progress: {e}") # Retry phase: process skipped files sequentially (no lock contention mode) # This runs OUTSIDE the enable_lock_skip_on_contention context, so locks wait normally @@ -1565,6 +1586,7 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: logger.warning(f"Lock still contended after retry for {file_path}") else: # Sequential processing (original behavior) - no retry needed + last_progress_update = 0 for file_path in iterator: files_processed += 1 file_path, error, _ = _index_file_task(file_path) @@ -1573,6 +1595,16 @@ def _index_file_task(file_path: Path) -> tuple[Path, Optional[Exception], bool]: print(f"Error indexing {file_path}: {error}") if log_progress and (files_processed % 25 == 0 or files_processed == total_files): print(f"[index] {files_processed}/{total_files} files processed") + # Update progress in workspace state every 10 files for live UI + if files_processed - last_progress_update >= 10 or files_processed == total_files: + last_progress_update = files_processed + try: + set_indexing_progress( + workspace_path, started_at, files_processed, total_files, + str(file_path) if file_path else None + ) + except Exception as e: + logger.debug(f"Failed to update indexing progress: {e}") if errors and log_progress: print(f"[index] Completed with {len(errors)} errors out of {total_files} files") diff --git a/scripts/upload_service.py b/scripts/upload_service.py index 3186d8dd..a44c214c 100644 --- a/scripts/upload_service.py +++ b/scripts/upload_service.py @@ -649,13 +649,24 @@ async def auth_validate(payload: AuthValidateRequest): @app.get("/admin") async def admin_root(request: Request): + # Check for demo session cookie first (works in demo mode) + candidate = _get_session_candidate_from_request(request) + session_id = candidate.get("session_id") or "" + if not AUTH_ENABLED: - # Auth disabled - go directly to dashboard (demo mode) - return RedirectResponse(url="/admin/acl", status_code=302) + # Demo mode: if we have a demo session cookie, go to dashboard + if session_id.startswith("demo-"): + return RedirectResponse(url="/admin/acl", status_code=302) + # Otherwise show login page for demo credentials + return RedirectResponse(url="/admin/login", status_code=302) + try: users_exist = has_any_users() except AuthDisabledError: - return RedirectResponse(url="/admin/acl", status_code=302) + # Fallback to demo mode behavior + if session_id.startswith("demo-"): + return RedirectResponse(url="/admin/acl", status_code=302) + return RedirectResponse(url="/admin/login", status_code=302) except Exception as e: logger.error(f"[upload_service] Failed to inspect user state for admin UI: {e}") raise HTTPException(status_code=500, detail="Failed to inspect user state") @@ -663,7 +674,6 @@ async def admin_root(request: Request): if not users_exist: return RedirectResponse(url="/admin/bootstrap", status_code=302) - candidate = _get_session_candidate_from_request(request) record = _get_valid_session_record(request) if record is None: return RedirectResponse(url="/admin/login", status_code=302) @@ -746,11 +756,31 @@ async def admin_login_submit( username: str = Form(...), password: str = Form(...), ): + # Demo mode: accept admin/admin when auth is disabled if not AUTH_ENABLED: - raise HTTPException(status_code=404, detail="Auth disabled") + if username == "admin" and password == "admin": + # Create a demo session cookie and redirect + import uuid + demo_session_id = f"demo-{uuid.uuid4().hex}" + resp = RedirectResponse(url="/admin/acl", status_code=302) + _set_admin_session_cookie(resp, demo_session_id) + return resp + return render_admin_login( + request=request, + error="Invalid credentials (demo mode: use admin/admin)", + status_code=401, + ) + try: user = authenticate_user(username, password) except AuthDisabledError: + # Fallback to demo mode + if username == "admin" and password == "admin": + import uuid + demo_session_id = f"demo-{uuid.uuid4().hex}" + resp = RedirectResponse(url="/admin/acl", status_code=302) + _set_admin_session_cookie(resp, demo_session_id) + return resp raise HTTPException(status_code=404, detail="Auth disabled") except Exception as e: logger.error(f"[upload_service] Error authenticating user for admin UI: {e}") diff --git a/templates/admin/acl.html b/templates/admin/acl.html index 11cdda0b..6e1a9c06 100644 --- a/templates/admin/acl.html +++ b/templates/admin/acl.html @@ -47,11 +47,14 @@

Collections

+ - + + + @@ -59,6 +62,9 @@

Collections

{% if collections and collections|length > 0 %} {% for c in collections %} + + + {% endfor %} {% else %} - + {% endif %}
ID Collection Status Progress ConfigHashesApplied HashPending HashCurrent Hash Actions
+
{{ (c.id or c.workspace_id or '—')[:16] }}...
+
{{ c.qdrant_collection }}
{% if c.container_path %} @@ -121,8 +127,19 @@

Collections

{{ (c.applied_indexing_hash or '—')[:8] }}
+
{% if c.pending_indexing_hash %}
{{ c.pending_indexing_hash[:8] }}
+ {% else %} + + {% endif %} +
+ {% if c.current_indexing_hash or c.indexing_config_hash %} +
{{ (c.current_indexing_hash or c.indexing_config_hash)[:8] }}
+ {% else %} + {% endif %}
@@ -154,7 +171,7 @@

Collections

No collections found
No collections found
@@ -431,8 +448,18 @@

Grant Collect ` : ''; + // Get ID - try different possible field names + const collectionId = c.id || c.workspace_id || ''; + const idDisplay = collectionId ? escapeHtml(collectionId.slice(0, 16)) + '...' : '—'; + + // Get current hash - try different possible field names + const currentHash = c.current_indexing_hash || c.indexing_config_hash || ''; + return ` + +
${idDisplay}
+
${escapeHtml(c.qdrant_collection)}
${c.container_path ? `
${escapeHtml(c.container_path)}
` : ''} @@ -451,8 +478,17 @@

Grant Collect ${configExtra} -
${escapeHtml((c.applied_indexing_hash || '—').slice(0, 8))}
- ${c.pending_indexing_hash ? `
${escapeHtml(c.pending_indexing_hash.slice(0, 8))}
` : ''} +
${escapeHtml((c.applied_indexing_hash || '—').slice(0, 8))}
+ + + ${c.pending_indexing_hash + ? `
${escapeHtml(c.pending_indexing_hash.slice(0, 8))}
` + : ''} + + + ${currentHash + ? `
${escapeHtml(currentHash.slice(0, 8))}
` + : ''}
@@ -516,7 +552,7 @@

Grant Collect function renderAllCollections(collections) { if (collections.length === 0) { - bodyEl.innerHTML = 'No collections found'; + bodyEl.innerHTML = 'No collections found'; } else { bodyEl.innerHTML = collections.map(renderRow).join(''); } diff --git a/templates/admin/login.html b/templates/admin/login.html index f8be9eaa..e0161003 100644 --- a/templates/admin/login.html +++ b/templates/admin/login.html @@ -249,6 +249,9 @@

Admin Login

+
+ Demo mode: use admin / admin +

From 72bb00a831e558c65f84e157f21b0bace419b802 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 20:25:10 -0500 Subject: [PATCH 11/14] Add API key management UI and improve admin actions Introduces an API Keys tab in the admin ACL page for managing shared access tokens, including creation, listing, and revocation of keys. Refactors collection action buttons for reindex, recreate, and delete to use new labeled action button styles for improved clarity and consistency. Enhances form select styling and adds supporting CSS for new UI elements. Also, the login page is now always shown to allow demo credentials entry. --- scripts/upload_service.py | 3 +- templates/admin/acl.html | 156 ++++++++++++++++++++++++++++++++++---- templates/admin/base.html | 93 +++++++++++++++++++++++ 3 files changed, 237 insertions(+), 15 deletions(-) diff --git a/scripts/upload_service.py b/scripts/upload_service.py index a44c214c..44f62b3c 100644 --- a/scripts/upload_service.py +++ b/scripts/upload_service.py @@ -745,8 +745,7 @@ async def admin_bootstrap_submit( @app.get("/admin/login") async def admin_login_form(request: Request): - if not AUTH_ENABLED: - raise HTTPException(status_code=404, detail="Auth disabled") + # Show login page even in demo mode so users can enter demo credentials return render_admin_login(request) diff --git a/templates/admin/acl.html b/templates/admin/acl.html index 6e1a9c06..60910c6c 100644 --- a/templates/admin/acl.html +++ b/templates/admin/acl.html @@ -19,6 +19,7 @@

Admin Dashboard

+ @@ -143,26 +144,29 @@

Collections

{% endif %} -
+
{% if c.has_mapping %}
-
-
{% endif %} {% if deletion_enabled %}
-
{% endif %} @@ -356,6 +360,115 @@

Grant Collect

+ + +