diff --git a/requirements.txt b/requirements.txt index 3c3a6714..fd28cbab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ tokenizers orjson redis>=5.0.0 xxhash>=3.0.0 +lz4>=4.0.0 # Tree-sitter 0.25+ with individual language packages tree_sitter>=0.25.0 tree_sitter_python>=0.23.0 diff --git a/scripts/indexing_admin.py b/scripts/indexing_admin.py index 500b33d4..52a939ae 100644 --- a/scripts/indexing_admin.py +++ b/scripts/indexing_admin.py @@ -202,7 +202,16 @@ def _copy_repo_state_for_clone( # Cache with TTL support _COLLECTION_SCHEMA_CACHE: Dict[str, Dict[str, Any]] = {} _COLLECTION_SCHEMA_CACHE_TS: Dict[str, float] = {} # Track cache timestamps -_COLLECTION_SCHEMA_CACHE_TTL = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300")) # 5 min default +try: + _ttl_raw = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300")) + # Validate TTL is finite and positive + import math + if not math.isfinite(_ttl_raw) or _ttl_raw <= 0: + _COLLECTION_SCHEMA_CACHE_TTL = 300.0 + else: + _COLLECTION_SCHEMA_CACHE_TTL = _ttl_raw +except (ValueError, TypeError): + _COLLECTION_SCHEMA_CACHE_TTL = 300.0 # 5 min default _SNAPSHOT_REFRESHED: Set[str] = set() _MAPPING_INDEX_CACHE: Dict[str, Any] = {"ts": 0.0, "work_dir": "", "value": {}} diff --git a/scripts/upload_service.py b/scripts/upload_service.py index ac2c2c23..341f2cd5 100644 --- a/scripts/upload_service.py +++ b/scripts/upload_service.py @@ -908,9 +908,13 @@ async def admin_acl_grant( async def admin_collections_status(request: Request): _require_admin_session(request) try: - collections = list_collections(include_deleted=False) + if AUTH_ENABLED: + collections = list_collections(include_deleted=False) + else: + # Demo mode: get collections directly from Qdrant + collections = list_qdrant_collections() except AuthDisabledError: - raise HTTPException(status_code=404, detail="Auth disabled") + collections = list_qdrant_collections() except Exception: raise HTTPException(status_code=500, detail="Failed to load collections") @@ -934,7 +938,11 @@ async def event_generator(): break try: - collections = list_collections(include_deleted=False) + if AUTH_ENABLED: + collections = list_collections(include_deleted=False) + else: + # Demo mode: get collections directly from Qdrant + collections = list_qdrant_collections() enriched = await asyncio.to_thread( lambda: build_admin_collections_view(collections=collections, work_dir=WORK_DIR) ) @@ -945,6 +953,19 @@ async def event_generator(): last_data = current_data yield f"data: {json.dumps({'type': 'full', 'collections': enriched}, default=str)}\n\n" + except AuthDisabledError: + # Fallback to Qdrant collections + try: + collections = list_qdrant_collections() + enriched = await asyncio.to_thread( + lambda: build_admin_collections_view(collections=collections, work_dir=WORK_DIR) + ) + current_data = json.dumps(enriched, default=str) + if current_data != last_data: + last_data = current_data + yield f"data: {json.dumps({'type': 'full', 'collections': enriched}, default=str)}\n\n" + except Exception as e: + yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" except Exception as e: yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" diff --git a/scripts/watch_index_core/processor.py b/scripts/watch_index_core/processor.py index 685b688f..4f03d78f 100644 --- a/scripts/watch_index_core/processor.py +++ b/scripts/watch_index_core/processor.py @@ -2,7 +2,6 @@ from __future__ import annotations -import hashlib import logging import os import subprocess @@ -10,6 +9,7 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional +import xxhash import scripts.ingest_code as idx from scripts.workspace_state import ( @@ -128,7 +128,7 @@ def _maybe_handle_staging_file( if not (is_staging_enabled() and state_env and collection): return False - _text, file_hash = _read_text_and_sha1(path) + _text, file_hash = _read_text_and_hash(path) if file_hash: try: cached_hash = get_cached_file_hash(str(path), repo_name) if repo_name else None @@ -348,17 +348,15 @@ def _process_paths( logger.debug(f"Suppressed exception: {e}") -def _read_text_and_sha1(path: Path) -> tuple[Optional[str], str]: +def _read_text_and_hash(path: Path) -> tuple[Optional[str], str]: + """Read file text and compute xxhash64 for consistency with pipeline.""" try: text = path.read_text(encoding="utf-8", errors="ignore") except Exception: text = None if not text: return text, "" - try: - file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() - except Exception: - file_hash = "" + file_hash = xxhash.xxh64(text.encode("utf-8", errors="ignore")).hexdigest() return text, file_hash @@ -378,7 +376,7 @@ def _run_indexing_strategy( except Exception as e: logger.debug(f"Suppressed exception: {e}") - text, file_hash = _read_text_and_sha1(path) + text, file_hash = _read_text_and_hash(path) ok = False if text is not None: try: diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index 55f2fa5c..14fcf105 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -144,20 +144,66 @@ def _redis_retry(fn, retries: int = 2, delay: float = 0.1): raise last_err # type: ignore +# LZ4 compression for Redis state storage +# Provides ~50-70% memory reduction with minimal CPU overhead +_LZ4_AVAILABLE = False +try: + import lz4.frame as lz4_frame + _LZ4_AVAILABLE = True +except ImportError: + lz4_frame = None # type: ignore + +# Prefix to identify LZ4-compressed values in Redis +_LZ4_PREFIX = b"LZ4:" + +def _redis_compress(data: bytes) -> bytes: + """Compress data with LZ4 if available, return with prefix.""" + if not _LZ4_AVAILABLE or lz4_frame is None: + return data + try: + compressed = lz4_frame.compress(data, compression_level=0) # fastest + # Only use compression if it actually saves space + if len(compressed) + len(_LZ4_PREFIX) < len(data): + return _LZ4_PREFIX + compressed + return data + except Exception: + return data + +def _redis_decompress(data: bytes) -> bytes: + """Decompress LZ4 data if prefixed, otherwise return as-is.""" + if not data: + return data + if data.startswith(_LZ4_PREFIX): + if not _LZ4_AVAILABLE or lz4_frame is None: + logger.warning("LZ4 compressed data found but lz4 not available") + return data + try: + return lz4_frame.decompress(data[len(_LZ4_PREFIX):]) + except Exception as e: + logger.debug(f"LZ4 decompress failed: {e}") + return data + return data + + def _redis_get_json(kind: str, path: Path) -> Optional[Dict[str, Any]]: client = _get_redis_client() if client is None: return None key = _redis_key_for_path(kind, path) try: - raw = _redis_retry(lambda: client.get(key)) + # Get raw bytes for decompression + raw = _redis_retry(lambda: client.execute_command("GET", key)) except Exception as e: logger.debug(f"Redis get failed for {key}: {e}") return None if not raw: return None try: - obj = json.loads(raw) + # Handle both string and bytes responses + if isinstance(raw, str): + raw = raw.encode("utf-8") + decompressed = _redis_decompress(raw) + obj = json.loads(decompressed.decode("utf-8")) except Exception as e: logger.debug(f"Redis JSON decode failed for {key}: {e}") return None @@ -172,12 +218,13 @@ def _redis_set_json(kind: str, path: Path, obj: Dict[str, Any]) -> bool: return False key = _redis_key_for_path(kind, path) try: - payload = json.dumps(obj, ensure_ascii=False) + payload = json.dumps(obj, ensure_ascii=False).encode("utf-8") + compressed = _redis_compress(payload) except Exception as e: - logger.debug(f"Failed to JSON serialize redis payload for {key}: {e}") + logger.debug(f"Failed to serialize/compress redis payload for {key}: {e}") return False try: - _redis_retry(lambda: client.set(key, payload)) + _redis_retry(lambda: client.execute_command("SET", key, compressed)) return True except Exception as e: logger.debug(f"Redis set failed for {key}: {e}") diff --git a/tests/test_watch_index_cache.py b/tests/test_watch_index_cache.py index c5065af1..0a5df87b 100644 --- a/tests/test_watch_index_cache.py +++ b/tests/test_watch_index_cache.py @@ -38,10 +38,10 @@ def test_processor_handles_cache_read_errors(monkeypatch, tmp_path): test_file = tmp_path / "test.txt" test_file.write_text("test content", encoding="utf-8") - # Exercise _read_text_and_sha1 - should handle errors gracefully and return content + hash - text, sha1 = proc_mod._read_text_and_sha1(test_file) + # Exercise _read_text_and_hash - should handle errors gracefully and return content + hash + text, file_hash = proc_mod._read_text_and_hash(test_file) assert text == "test content" - assert sha1 is not None and len(sha1) == 40 # SHA1 hex length + assert file_hash is not None and len(file_hash) == 16 # xxhash64 hex length def test_handler_move_event_handles_cache_errors(monkeypatch, tmp_path):