Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 72 additions & 9 deletions scripts/ast_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,18 +209,29 @@ class ASTAnalyzer:
- Dependency tracking
- Semantic chunking (preserve boundaries)
- Cross-reference analysis
- Tree cache for parsed ASTs (avoids re-parsing unchanged files)
"""

def __init__(self, use_tree_sitter: bool = True):
def __init__(self, use_tree_sitter: bool = True, use_tree_cache: bool = True):
"""
Initialize AST analyzer.

Args:
use_tree_sitter: Use tree-sitter when available (fallback to ast module)
use_tree_cache: Cache parsed trees for unchanged files (mtime-based invalidation)
"""
self.use_tree_sitter = use_tree_sitter and _TS_AVAILABLE
self._parsers: Dict[str, Any] = {}

# Tree cache for avoiding re-parsing unchanged files
self._tree_cache = None
if use_tree_cache:
try:
from scripts.ingest.tree_cache import get_default_cache
self._tree_cache = get_default_cache()
except ImportError:
logger.debug("TreeCache not available, parsing will not be cached")

# Language support matrix
self.supported_languages = {
"python": {"ast": True, "tree_sitter": True},
Expand All @@ -234,7 +245,49 @@ def __init__(self, use_tree_sitter: bool = True):
"ruby": {"ast": False, "tree_sitter": True},
}

logger.info(f"ASTAnalyzer initialized: tree_sitter={self.use_tree_sitter}")
logger.info(f"ASTAnalyzer initialized: tree_sitter={self.use_tree_sitter}, tree_cache={'enabled' if self._tree_cache else 'disabled'}")

def _parse_with_cache(self, parser: Any, content: str, file_path: str, language: str, content_provided: bool = False) -> Optional[Any]:
"""Parse content with tree-sitter, using cache when available.

Args:
parser: Tree-sitter parser instance
content: Source code content
file_path: Path to the file (used as cache key)
language: Programming language
content_provided: If True, content was explicitly provided (not read from disk),
so skip cache to avoid returning stale tree

Returns:
Parsed tree or None on failure
"""
path = Path(file_path) if file_path else None

# Try to get cached tree (only for real files when content was NOT explicitly provided)
# If content_provided=True, the caller passed in-memory content that may differ from disk
if self._tree_cache and path and path.exists() and not content_provided:
cached_tree = self._tree_cache.get(path)
if cached_tree is not None:
return cached_tree

# Parse the content
try:
tree = parser.parse(content.encode("utf-8"))
except Exception as e:
logger.debug(f"Tree-sitter parse failed for {language}: {e}")
return None

# Cache the result for real files
if self._tree_cache and path and path.exists() and tree is not None:
self._tree_cache.put(path, tree)

return tree

def get_tree_cache_stats(self) -> Dict[str, Any]:
"""Get tree cache statistics for monitoring."""
if self._tree_cache:
return self._tree_cache.get_stats()
return {"enabled": False}

def analyze_file(
self, file_path: str, language: str, content: Optional[str] = None
Expand All @@ -250,6 +303,10 @@ def analyze_file(
Returns:
Dict with symbols, imports, calls, and dependencies
"""
# Track if content was explicitly provided (vs read from disk)
# This affects caching - explicit content may differ from on-disk state
content_provided = content is not None

if content is None:
try:
content = Path(file_path).read_text(encoding="utf-8", errors="ignore")
Expand All @@ -259,7 +316,7 @@ def analyze_file(

# Use language mappings (32 languages, declarative queries)
if _LANGUAGE_MAPPINGS_AVAILABLE and self.use_tree_sitter:
result = self._analyze_with_mapping(content, file_path, language)
result = self._analyze_with_mapping(content, file_path, language, content_provided)
if result and (result.get("symbols") or result.get("imports") or result.get("calls")):
return result

Expand Down Expand Up @@ -438,11 +495,17 @@ def extract_dependencies(

# ---- Language Mappings Analysis (unified, concept-based) ----

def _analyze_with_mapping(self, content: str, file_path: str, language: str) -> Dict[str, Any]:
def _analyze_with_mapping(self, content: str, file_path: str, language: str, content_provided: bool = False) -> Dict[str, Any]:
"""Analyze code using language mappings (concept-based extraction).

This uses the declarative tree-sitter queries from language_mappings
to extract symbols, imports, and calls. Supports 34 languages.

Args:
content: Source code content
file_path: Path to the file
language: Programming language
content_provided: If True, content was explicitly provided (not read from disk)
"""
if not _LANGUAGE_MAPPINGS_AVAILABLE:
return self._empty_analysis()
Expand All @@ -461,12 +524,12 @@ def _analyze_with_mapping(self, content: str, file_path: str, language: str) ->
if not parser:
return self._empty_analysis()

try:
tree = parser.parse(content.encode("utf-8"))
root = tree.root_node
except Exception as e:
logger.debug(f"Tree-sitter parse failed for {language}: {e}")
# Parse with caching (avoids re-parsing unchanged files)
# Skip cache if content was explicitly provided to avoid stale results
tree = self._parse_with_cache(parser, content, file_path, language, content_provided)
if tree is None:
return self._empty_analysis()
root = tree.root_node

content_bytes = content.encode("utf-8")
symbols: List[CodeSymbol] = []
Expand Down
60 changes: 56 additions & 4 deletions scripts/ingest/qdrant.py
Original file line number Diff line number Diff line change
Expand Up @@ -855,10 +855,19 @@ def delete_points_by_path(client: QdrantClient, collection: str, file_path: str)


def upsert_points(
client: QdrantClient, collection: str, points: List[models.PointStruct]
client: QdrantClient, collection: str, points: List[models.PointStruct],
*, wait: bool = None
):
"""Upsert points with retry and batching.

Args:
client: Qdrant client instance
collection: Collection name
points: List of points to upsert
wait: Whether to wait for upsert to complete. Default is controlled by
INDEX_UPSERT_ASYNC env var (0=sync/wait, 1=async/no-wait).
Async mode is faster but may cause read-after-write issues.

Raises:
ValueError: If collection is None or empty.
"""
Expand All @@ -878,19 +887,24 @@ def upsert_points(
backoff = float(os.environ.get("INDEX_UPSERT_BACKOFF", "0.5") or 0.5)
except Exception:
backoff = 0.5

# Determine wait mode: explicit param > env var > default (sync)
if wait is None:
async_mode = os.environ.get("INDEX_UPSERT_ASYNC", "0").strip().lower() in {"1", "true", "yes", "on"}
wait = not async_mode

failed_count = 0
for i in range(0, len(points), max(1, bsz)):
batch = points[i : i + max(1, bsz)]
attempt = 0
while True:
try:
client.upsert(collection_name=collection, points=batch, wait=True)
client.upsert(collection_name=collection, points=batch, wait=wait)
break
except Exception as e:
attempt += 1
if attempt >= retries:
# Final fallback: try smaller sub-batches
# Final fallback: try smaller sub-batches (always sync for reliability)
sub_size = max(1, bsz // 4)
sub_failed = 0
for j in range(0, len(batch), sub_size):
Expand All @@ -901,7 +915,6 @@ def upsert_points(
)
except Exception as sub_e:
sub_failed += len(sub)
# Log individual sub-batch failures for debugging
print(f"[UPSERT_WARNING] Sub-batch upsert failed ({len(sub)} points): {sub_e}", flush=True)
if sub_failed > 0:
failed_count += sub_failed
Expand All @@ -917,6 +930,45 @@ def upsert_points(
print(f"[UPSERT_SUMMARY] Total {failed_count}/{len(points)} points failed to upsert", flush=True)


def flush_upserts(client: QdrantClient, collection: str) -> None:
"""Best-effort sync for pending async upserts.

Call this after a batch of async upserts (INDEX_UPSERT_ASYNC=1) to improve
likelihood that data is visible for subsequent reads.

IMPORTANT: Qdrant's wait=False semantics mean upserts are "confirmed received"
but not necessarily "applied". This function performs operations that encourage
the server to process pending writes, but cannot guarantee immediate consistency.

For strict consistency requirements:
- Use wait=True (INDEX_UPSERT_ASYNC=0) during upserts, or
- Add application-level retry logic for read-after-write scenarios

For remote deployments, network latency may increase the window between
upsert confirmation and data visibility.

Args:
client: Qdrant client instance
collection: Collection name
"""
if not collection:
return
try:
# 1. Get collection info (lightweight metadata read)
client.get_collection(collection)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush_upserts() calls get_collection(), but Qdrant’s wait=False semantics are “confirmed received” rather than “applied”, so this may not actually guarantee prior async upserts are committed before reads.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎


# 2. Perform a minimal scroll to encourage segment processing
# This touches actual data, which helps flush pending writes
client.scroll(
collection_name=collection,
limit=1,
with_payload=False,
with_vectors=False,
)
except Exception as e:
logger.debug(f"flush_upserts: {e}")


def hash_id(text: str, path: str, start: int, end: int) -> int:
"""Generate a stable hash ID for a chunk."""
h = hashlib.sha1(
Expand Down
73 changes: 54 additions & 19 deletions scripts/ingest/vectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,28 @@
_STOP,
)

# Try to use numpy for faster vector operations (10-50x speedup)
try:
import numpy as np
_NUMPY_AVAILABLE = True
except ImportError:
np = None # type: ignore
_NUMPY_AVAILABLE = False

# ---------------------------------------------------------------------------
# Mini vector projection cache
# ---------------------------------------------------------------------------
_MINI_PROJ_CACHE: dict[tuple[int, int, int], list[list[float]]] = {}
# Cache stores numpy arrays when numpy is available, else nested lists
_MINI_PROJ_CACHE: dict[tuple[int, int, int], Any] = {}


def _get_mini_proj(
in_dim: int, out_dim: int, seed: int | None = None
) -> list[list[float]]:
"""Get or create a random projection matrix for mini vectors."""
) -> Any:
"""Get or create a random projection matrix for mini vectors.

Returns numpy array if numpy is available, else nested list.
"""
import math
import random

Expand All @@ -38,31 +50,54 @@ def _get_mini_proj(
rnd = random.Random(s)
scale = 1.0 / math.sqrt(out_dim)
# Dense Rademacher matrix (+/-1) scaled; good enough for fast gating
M = [
[scale * (1.0 if rnd.random() < 0.5 else -1.0) for _ in range(out_dim)]
for _ in range(in_dim)
]
if _NUMPY_AVAILABLE:
# Use numpy for faster matrix operations
# Generate same values as pure Python for reproducibility
M_list = [
[scale * (1.0 if rnd.random() < 0.5 else -1.0) for _ in range(out_dim)]
for _ in range(in_dim)
]
M = np.array(M_list, dtype=np.float32)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NumPy path uses float32 for both the projection matrix and input vector, so results will differ from the pure-Python (float64) path despite the “reproducibility” comment; if determinism across environments matters, this could be surprising.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎

else:
M = [
[scale * (1.0 if rnd.random() < 0.5 else -1.0) for _ in range(out_dim)]
for _ in range(in_dim)
]
_MINI_PROJ_CACHE[key] = M
return M


def project_mini(vec: list[float], out_dim: int | None = None) -> list[float]:
"""Project a dense vector to a compact mini vector using random projection."""
"""Project a dense vector to a compact mini vector using random projection.

Uses numpy when available for 10-50x speedup, falls back to pure Python.
"""
if not vec:
return [0.0] * (int(out_dim or MINI_VEC_DIM))
od = int(out_dim or MINI_VEC_DIM)
M = _get_mini_proj(len(vec), od)
out = [0.0] * od
# y = x @ M
for i, val in enumerate(vec):
if val == 0.0:
continue
row = M[i]
for j in range(od):
out[j] += val * row[j]
# L2 normalize to keep scale consistent
norm = (sum(x * x for x in out) or 0.0) ** 0.5 or 1.0
return [x / norm for x in out]

if _NUMPY_AVAILABLE:
# Fast path: numpy matrix multiply + normalize
x = np.array(vec, dtype=np.float32)
out = x @ M # (in_dim,) @ (in_dim, out_dim) -> (out_dim,)
norm = np.linalg.norm(out)
if norm > 0:
out = out / norm
return out.tolist()
else:
# Fallback: pure Python implementation
out = [0.0] * od
# y = x @ M
for i, val in enumerate(vec):
if val == 0.0:
continue
row = M[i]
for j in range(od):
out[j] += val * row[j]
# L2 normalize to keep scale consistent
norm = (sum(x * x for x in out) or 0.0) ** 0.5 or 1.0
return [x / norm for x in out]


def _split_ident_lex(s: str) -> List[str]:
Expand Down
Loading