From bcb61bc58bfa070a7c276035db9d426e9e27b347 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 22:38:03 -0500 Subject: [PATCH 1/5] Improve collection handling and hashing consistency Added robust parsing for SCHEMA_CACHE_TTL in indexing_admin.py to handle invalid environment values. Updated upload_service.py to better support demo mode and fallback to Qdrant collections when authentication is disabled. Modified watch_index_core/processor.py to use xxhash for file hashing for consistency with the pipeline, with a fallback to hashlib if xxhash is unavailable. --- scripts/indexing_admin.py | 5 ++++- scripts/upload_service.py | 27 ++++++++++++++++++++++++--- scripts/watch_index_core/processor.py | 6 ++++++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/scripts/indexing_admin.py b/scripts/indexing_admin.py index 500b33d4..9d50040c 100644 --- a/scripts/indexing_admin.py +++ b/scripts/indexing_admin.py @@ -202,7 +202,10 @@ def _copy_repo_state_for_clone( # Cache with TTL support _COLLECTION_SCHEMA_CACHE: Dict[str, Dict[str, Any]] = {} _COLLECTION_SCHEMA_CACHE_TS: Dict[str, float] = {} # Track cache timestamps -_COLLECTION_SCHEMA_CACHE_TTL = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300")) # 5 min default +try: + _COLLECTION_SCHEMA_CACHE_TTL = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300")) +except (ValueError, TypeError): + _COLLECTION_SCHEMA_CACHE_TTL = 300.0 # 5 min default _SNAPSHOT_REFRESHED: Set[str] = set() _MAPPING_INDEX_CACHE: Dict[str, Any] = {"ts": 0.0, "work_dir": "", "value": {}} diff --git a/scripts/upload_service.py b/scripts/upload_service.py index ac2c2c23..341f2cd5 100644 --- a/scripts/upload_service.py +++ b/scripts/upload_service.py @@ -908,9 +908,13 @@ async def admin_acl_grant( async def admin_collections_status(request: Request): _require_admin_session(request) try: - collections = list_collections(include_deleted=False) + if AUTH_ENABLED: + collections = list_collections(include_deleted=False) + else: + # Demo mode: get collections directly from Qdrant + collections = list_qdrant_collections() except AuthDisabledError: - raise HTTPException(status_code=404, detail="Auth disabled") + collections = list_qdrant_collections() except Exception: raise HTTPException(status_code=500, detail="Failed to load collections") @@ -934,7 +938,11 @@ async def event_generator(): break try: - collections = list_collections(include_deleted=False) + if AUTH_ENABLED: + collections = list_collections(include_deleted=False) + else: + # Demo mode: get collections directly from Qdrant + collections = list_qdrant_collections() enriched = await asyncio.to_thread( lambda: build_admin_collections_view(collections=collections, work_dir=WORK_DIR) ) @@ -945,6 +953,19 @@ async def event_generator(): last_data = current_data yield f"data: {json.dumps({'type': 'full', 'collections': enriched}, default=str)}\n\n" + except AuthDisabledError: + # Fallback to Qdrant collections + try: + collections = list_qdrant_collections() + enriched = await asyncio.to_thread( + lambda: build_admin_collections_view(collections=collections, work_dir=WORK_DIR) + ) + current_data = json.dumps(enriched, default=str) + if current_data != last_data: + last_data = current_data + yield f"data: {json.dumps({'type': 'full', 'collections': enriched}, default=str)}\n\n" + except Exception as e: + yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" except Exception as e: yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" diff --git a/scripts/watch_index_core/processor.py b/scripts/watch_index_core/processor.py index 685b688f..fd68a73f 100644 --- a/scripts/watch_index_core/processor.py +++ b/scripts/watch_index_core/processor.py @@ -349,6 +349,7 @@ def _process_paths( def _read_text_and_sha1(path: Path) -> tuple[Optional[str], str]: + """Read file text and compute hash. Uses xxhash for consistency with pipeline.""" try: text = path.read_text(encoding="utf-8", errors="ignore") except Exception: @@ -356,6 +357,11 @@ def _read_text_and_sha1(path: Path) -> tuple[Optional[str], str]: if not text: return text, "" try: + # Use xxhash for consistency with scripts/ingest/pipeline.py + import xxhash + file_hash = xxhash.xxh64(text.encode("utf-8", errors="ignore")).hexdigest() + except ImportError: + # Fallback to hashlib if xxhash not available file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() except Exception: file_hash = "" From d463bdf54cd028a8f52851a0a836bcf57ac9c761 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 23:06:43 -0500 Subject: [PATCH 2/5] Improve error handling for cache TTL and file hashing Added validation for SCHEMA_CACHE_TTL_SECS to ensure it is finite and positive, defaulting to 300 seconds if invalid. Enhanced file hash computation to fallback to hashlib.sha1 on any xxhash runtime error, improving robustness. --- scripts/indexing_admin.py | 8 +++++++- scripts/watch_index_core/processor.py | 6 +++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/scripts/indexing_admin.py b/scripts/indexing_admin.py index 9d50040c..52a939ae 100644 --- a/scripts/indexing_admin.py +++ b/scripts/indexing_admin.py @@ -203,7 +203,13 @@ def _copy_repo_state_for_clone( _COLLECTION_SCHEMA_CACHE: Dict[str, Dict[str, Any]] = {} _COLLECTION_SCHEMA_CACHE_TS: Dict[str, float] = {} # Track cache timestamps try: - _COLLECTION_SCHEMA_CACHE_TTL = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300")) + _ttl_raw = float(os.environ.get("SCHEMA_CACHE_TTL_SECS", "300")) + # Validate TTL is finite and positive + import math + if not math.isfinite(_ttl_raw) or _ttl_raw <= 0: + _COLLECTION_SCHEMA_CACHE_TTL = 300.0 + else: + _COLLECTION_SCHEMA_CACHE_TTL = _ttl_raw except (ValueError, TypeError): _COLLECTION_SCHEMA_CACHE_TTL = 300.0 # 5 min default _SNAPSHOT_REFRESHED: Set[str] = set() diff --git a/scripts/watch_index_core/processor.py b/scripts/watch_index_core/processor.py index fd68a73f..b9a63219 100644 --- a/scripts/watch_index_core/processor.py +++ b/scripts/watch_index_core/processor.py @@ -364,7 +364,11 @@ def _read_text_and_sha1(path: Path) -> tuple[Optional[str], str]: # Fallback to hashlib if xxhash not available file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() except Exception: - file_hash = "" + # Fallback to hashlib on any xxhash runtime error + try: + file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() + except Exception: + file_hash = "" return text, file_hash From 01709b3bcf08fe39f9645ec0241bdd2598ef2544 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 23:10:34 -0500 Subject: [PATCH 3/5] Refactor to use xxhash exclusively for file hashing Removed fallback to hashlib and now require xxhash for hashing file contents in _read_text_and_sha1. This ensures consistency with other scripts and simplifies the code by eliminating error handling for missing xxhash. --- scripts/watch_index_core/processor.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/scripts/watch_index_core/processor.py b/scripts/watch_index_core/processor.py index b9a63219..0500d304 100644 --- a/scripts/watch_index_core/processor.py +++ b/scripts/watch_index_core/processor.py @@ -2,7 +2,6 @@ from __future__ import annotations -import hashlib import logging import os import subprocess @@ -11,6 +10,8 @@ from pathlib import Path from typing import Dict, List, Optional +import xxhash + import scripts.ingest_code as idx from scripts.workspace_state import ( _extract_repo_name_from_path, @@ -356,19 +357,7 @@ def _read_text_and_sha1(path: Path) -> tuple[Optional[str], str]: text = None if not text: return text, "" - try: - # Use xxhash for consistency with scripts/ingest/pipeline.py - import xxhash - file_hash = xxhash.xxh64(text.encode("utf-8", errors="ignore")).hexdigest() - except ImportError: - # Fallback to hashlib if xxhash not available - file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() - except Exception: - # Fallback to hashlib on any xxhash runtime error - try: - file_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() - except Exception: - file_hash = "" + file_hash = xxhash.xxh64(text.encode("utf-8", errors="ignore")).hexdigest() return text, file_hash From 4151cb4dc1491f44e966175c044c7e74b520657d Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 23:13:27 -0500 Subject: [PATCH 4/5] Refactor file hash function to use xxhash64 Renamed _read_text_and_sha1 to _read_text_and_hash and updated its implementation and usage to compute xxhash64 instead of SHA1 for consistency with the pipeline. Updated related test to check for the correct hash length. --- scripts/watch_index_core/processor.py | 9 ++++----- tests/test_watch_index_cache.py | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/scripts/watch_index_core/processor.py b/scripts/watch_index_core/processor.py index 0500d304..4f03d78f 100644 --- a/scripts/watch_index_core/processor.py +++ b/scripts/watch_index_core/processor.py @@ -9,7 +9,6 @@ from datetime import datetime from pathlib import Path from typing import Dict, List, Optional - import xxhash import scripts.ingest_code as idx @@ -129,7 +128,7 @@ def _maybe_handle_staging_file( if not (is_staging_enabled() and state_env and collection): return False - _text, file_hash = _read_text_and_sha1(path) + _text, file_hash = _read_text_and_hash(path) if file_hash: try: cached_hash = get_cached_file_hash(str(path), repo_name) if repo_name else None @@ -349,8 +348,8 @@ def _process_paths( logger.debug(f"Suppressed exception: {e}") -def _read_text_and_sha1(path: Path) -> tuple[Optional[str], str]: - """Read file text and compute hash. Uses xxhash for consistency with pipeline.""" +def _read_text_and_hash(path: Path) -> tuple[Optional[str], str]: + """Read file text and compute xxhash64 for consistency with pipeline.""" try: text = path.read_text(encoding="utf-8", errors="ignore") except Exception: @@ -377,7 +376,7 @@ def _run_indexing_strategy( except Exception as e: logger.debug(f"Suppressed exception: {e}") - text, file_hash = _read_text_and_sha1(path) + text, file_hash = _read_text_and_hash(path) ok = False if text is not None: try: diff --git a/tests/test_watch_index_cache.py b/tests/test_watch_index_cache.py index c5065af1..0a5df87b 100644 --- a/tests/test_watch_index_cache.py +++ b/tests/test_watch_index_cache.py @@ -38,10 +38,10 @@ def test_processor_handles_cache_read_errors(monkeypatch, tmp_path): test_file = tmp_path / "test.txt" test_file.write_text("test content", encoding="utf-8") - # Exercise _read_text_and_sha1 - should handle errors gracefully and return content + hash - text, sha1 = proc_mod._read_text_and_sha1(test_file) + # Exercise _read_text_and_hash - should handle errors gracefully and return content + hash + text, file_hash = proc_mod._read_text_and_hash(test_file) assert text == "test content" - assert sha1 is not None and len(sha1) == 40 # SHA1 hex length + assert file_hash is not None and len(file_hash) == 16 # xxhash64 hex length def test_handler_move_event_handles_cache_errors(monkeypatch, tmp_path): From 0c23a778b5e3e934e27e8d7eacc9724e3479e694 Mon Sep 17 00:00:00 2001 From: John Donalson Date: Sun, 25 Jan 2026 23:22:09 -0500 Subject: [PATCH 5/5] Add LZ4 compression for Redis state storage Introduces optional LZ4 compression for JSON data stored in Redis, reducing memory usage with minimal CPU overhead. Updates requirements.txt to include lz4 as a dependency and modifies workspace_state.py to compress data before storing and decompress on retrieval, falling back gracefully if lz4 is unavailable. --- requirements.txt | 1 + scripts/workspace_state.py | 57 ++++++++++++++++++++++++++++++++++---- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/requirements.txt b/requirements.txt index 3c3a6714..fd28cbab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,7 @@ tokenizers orjson redis>=5.0.0 xxhash>=3.0.0 +lz4>=4.0.0 # Tree-sitter 0.25+ with individual language packages tree_sitter>=0.25.0 tree_sitter_python>=0.23.0 diff --git a/scripts/workspace_state.py b/scripts/workspace_state.py index 55f2fa5c..14fcf105 100644 --- a/scripts/workspace_state.py +++ b/scripts/workspace_state.py @@ -144,20 +144,66 @@ def _redis_retry(fn, retries: int = 2, delay: float = 0.1): raise last_err # type: ignore +# LZ4 compression for Redis state storage +# Provides ~50-70% memory reduction with minimal CPU overhead +_LZ4_AVAILABLE = False +try: + import lz4.frame as lz4_frame + _LZ4_AVAILABLE = True +except ImportError: + lz4_frame = None # type: ignore + +# Prefix to identify LZ4-compressed values in Redis +_LZ4_PREFIX = b"LZ4:" + +def _redis_compress(data: bytes) -> bytes: + """Compress data with LZ4 if available, return with prefix.""" + if not _LZ4_AVAILABLE or lz4_frame is None: + return data + try: + compressed = lz4_frame.compress(data, compression_level=0) # fastest + # Only use compression if it actually saves space + if len(compressed) + len(_LZ4_PREFIX) < len(data): + return _LZ4_PREFIX + compressed + return data + except Exception: + return data + +def _redis_decompress(data: bytes) -> bytes: + """Decompress LZ4 data if prefixed, otherwise return as-is.""" + if not data: + return data + if data.startswith(_LZ4_PREFIX): + if not _LZ4_AVAILABLE or lz4_frame is None: + logger.warning("LZ4 compressed data found but lz4 not available") + return data + try: + return lz4_frame.decompress(data[len(_LZ4_PREFIX):]) + except Exception as e: + logger.debug(f"LZ4 decompress failed: {e}") + return data + return data + + def _redis_get_json(kind: str, path: Path) -> Optional[Dict[str, Any]]: client = _get_redis_client() if client is None: return None key = _redis_key_for_path(kind, path) try: - raw = _redis_retry(lambda: client.get(key)) + # Get raw bytes for decompression + raw = _redis_retry(lambda: client.execute_command("GET", key)) except Exception as e: logger.debug(f"Redis get failed for {key}: {e}") return None if not raw: return None try: - obj = json.loads(raw) + # Handle both string and bytes responses + if isinstance(raw, str): + raw = raw.encode("utf-8") + decompressed = _redis_decompress(raw) + obj = json.loads(decompressed.decode("utf-8")) except Exception as e: logger.debug(f"Redis JSON decode failed for {key}: {e}") return None @@ -172,12 +218,13 @@ def _redis_set_json(kind: str, path: Path, obj: Dict[str, Any]) -> bool: return False key = _redis_key_for_path(kind, path) try: - payload = json.dumps(obj, ensure_ascii=False) + payload = json.dumps(obj, ensure_ascii=False).encode("utf-8") + compressed = _redis_compress(payload) except Exception as e: - logger.debug(f"Failed to JSON serialize redis payload for {key}: {e}") + logger.debug(f"Failed to serialize/compress redis payload for {key}: {e}") return False try: - _redis_retry(lambda: client.set(key, payload)) + _redis_retry(lambda: client.execute_command("SET", key, compressed)) return True except Exception as e: logger.debug(f"Redis set failed for {key}: {e}")