From 11e1701c7bd276fae9e9133fbc2791ded7e4f748 Mon Sep 17 00:00:00 2001 From: offx-zinth Date: Sat, 25 Apr 2026 16:53:07 +0530 Subject: [PATCH] Specific improvements --- SPEC.md | 746 +++++++++++++++++++++++++++ smp/store/graph/edge_store.py | 28 + smp/store/graph/index.py | 60 +++ smp/store/graph/manifest.py | 24 + smp/store/graph/mmap_file.py | 178 +++++++ smp/store/graph/mmap_store.py | 380 ++++++++++++++ smp/store/graph/node_store.py | 40 ++ smp/store/graph/parser.py | 421 +++++++++++++++ smp/store/graph/scheduler.py | 128 +++++ smp/store/graph/string_pool.py | 49 ++ tests/store/graph/test_mmap_store.py | 237 +++++++++ 11 files changed, 2291 insertions(+) create mode 100644 SPEC.md create mode 100644 smp/store/graph/edge_store.py create mode 100644 smp/store/graph/index.py create mode 100644 smp/store/graph/manifest.py create mode 100644 smp/store/graph/mmap_file.py create mode 100644 smp/store/graph/mmap_store.py create mode 100644 smp/store/graph/node_store.py create mode 100644 smp/store/graph/parser.py create mode 100644 smp/store/graph/scheduler.py create mode 100644 smp/store/graph/string_pool.py create mode 100644 tests/store/graph/test_mmap_store.py diff --git a/SPEC.md b/SPEC.md new file mode 100644 index 0000000..07c3c2f --- /dev/null +++ b/SPEC.md @@ -0,0 +1,746 @@ +# SMP Graph Engine - Technical Specification + +## Overview + +SMP Graph Engine is an **ingest-free, memory-mapped graph database** for code analysis at scale (10M+ LOC). It replaces Neo4j with a custom single-file storage format optimized for sparse access patterns and on-demand parsing. + +### Key Properties + +| Property | Value | +|----------|-------| +| **Storage** | Single `.smpg` file + optional `.smpv` for vectors | +| **Format** | Append-only, memory-mapped | +| **Scale** | 50M+ LOC, ~100GB disk, ~15GB RAM hot set | +| **Latency** | Index lookup <1ms, first-parse on-demand | +| **Crash Safety** | WAL (Write-Ahead Log) with replay | +| **Concurrency** | Lock-free reads, WAL-based writes | + +--- + +## File Format + +### `.smpg` Layout + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Offset 0..4095: HEADER (4096 bytes fixed) │ +├─────────────────────────────────────────────────────────────────┤ +│ Offset 4096..65535: WAL (64KB, configurable) │ +│ - Circular buffer of 512-byte WAL records │ +│ - Each record: {type, size, payload, crc32} │ +│ - Commit records mark transaction boundaries │ +├─────────────────────────────────────────────────────────────────┤ +│ Offset 65536..: DATA REGION (sparse, grows as needed) │ +│ │ +│ ┌─────────────────────────────────────────────────────────┐ │ +│ │ Crit-bit Index Pages │ │ +│ │ Key: node_id (64-bit hash) │ │ +│ │ Value: InodePtr (64-bit offset into this file) │ │ +│ └─────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────────────┐ │ +│ │ Radix Tree (Secondary Index) │ │ +│ │ file_path components → [node_id, node_id, ...] │ │ +│ │ Enables "all nodes in file" queries │ │ +│ └─────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────────────┐ │ +│ │ String Pool (Atom Table) │ │ +│ │ deduplicated strings: paths, names, signatures │ │ +│ │ hash32 → {offset, length} │ │ +│ └─────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────────────┐ │ +│ │ Node Data Region │ │ +│ │ Inode: {type, name_offset, sig_offset, │ │ +│ │ file_offset, line_start, line_end, │ │ +│ │ edge_list_ptr, edge_count, flags} │ │ +│ │ Fixed 64-byte entries, allocated from free list │ │ +│ └─────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────────────┐ │ +│ │ Edge Store │ │ +│ │ Per-node adjacency lists │ │ +│ │ Varint encoded: [count, [(target, type, attrs)...]] │ │ +│ └─────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────────────┐ │ +│ │ Parsed Code Cache │ │ +│ │ Pre-extracted AST metadata per file │ │ +│ │ Memory-mapped, OS page cache managed │ │ +│ │ LRU eviction when memory pressure │ │ +│ └─────────────────────────────────────────────────────────┘ │ +│ │ +│ ┌─────────────────────────────────────────────────────────┐ │ +│ │ File Manifest │ │ +│ │ Fixed 128-byte entries │ │ +│ │ {path_str_offset, hash, line_count, │ │ +│ │ parsed_offset|0, last_modified, priority, │ │ +│ │ ref_count, status} │ │ +│ └─────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### `.smpv` Layout (Vectors) + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Offset 0..4095: VECTOR HEADER │ +│ Magic: 0x534D5056 ('SMPV'), version, dim, count │ +├─────────────────────────────────────────────────────────────────┤ +│ Vector Index Pages │ +│ node_id (64-bit) → vector_offset (64-bit) │ +├─────────────────────────────────────────────────────────────────┤ +│ Dense Float32 Embeddings │ +│ [dim floats] per node, appended sequentially │ +├─────────────────────────────────────────────────────────────────┤ +│ LRU Metadata (mmap'd, 16 bytes per vector) │ +│ {last_accessed_ts, access_count} │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### Node ID Format + +``` +node_id = "{file_hash}::{type}::{name}::{start_line}" +Example: "a1b2c3d4::Function::login::42" + +- file_hash: xxhash64 of normalized absolute path +- type: Class | Function | Method | Import | Statement | ... +- name: extracted name or synthetic (e.g., "lambda_1") +- start_line: 1-indexed line number +``` + +--- + +## Index Structures + +### Crit-bit Tree (Primary Index) + +- **Purpose**: Fast exact match lookups +- **Operations**: Insert, delete, find (all O(k) where k = key length) +- **Reads**: Lock-free using memory barriers +- **Writes**: Protected by WAL + single writer lock + +``` +Structure: + Internal node: {child0_offset, child1_offset, crit_byte_pos} + Leaf node: {key (node_id), value (inode_ptr)} + +Lookup: + 1. Start at root + 2. At each internal node, select child based on crit_byte_pos of key + 3. Traverse until leaf + 4. Return value if key matches, else not found +``` + +### Radix Tree (Secondary Index) + +- **Purpose**: Range queries by file path +- **Enables**: "All nodes in file F", "All nodes in directory D" + +``` +Structure: + Radix node: {prefix, children {char: child_offset}, node_ids[]} + +Lookup "all nodes in file F": + 1. Split F into path components + 2. Traverse radix tree following components + 3. Return union of node_ids at leaf + all ancestors +``` + +### String Pool (Atom Table) + +- **Purpose**: Deduplicate strings (paths, names, signatures, docstrings) +- **Storage**: Length-prefixed UTF-8, 64-bit aligned + +``` +Structure: + Header: {count, hash_index_offset} + Hash Index: hash32 → [offset, length, next_collision] + Data: raw string bytes + +Lookup: + 1. Hash string → h + 2. Probe hash index at h % index_size + 3. Follow collision chain until match + 4. Return offset + length +``` + +--- + +## WAL (Write-Ahead Log) + +### WAL Record Format (512 bytes each) + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ 0 : record_type (uint8) │ +│ 0x01 = NODE_INSERT │ +│ 0x02 = NODE_DELETE │ +│ 0x03 = EDGE_INSERT │ +│ 0x04 = EDGE_DELETE │ +│ 0x05 = FILE_PARSED │ +│ 0x06 = COMMIT │ +│ 0x07 = CHECKPOINT │ +│ 1..3 : reserved │ +│ 4..7 : payload_size (uint32) │ +│ 8..11 : crc32 (uint32) │ +│ 12..511: payload (padded to 512 bytes) │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### Recovery Protocol + +``` +On startup: +1. Scan WAL from head +2. Build redo list of uncommitted records +3. Replay records in order +4. On finding COMMIT record: clear committed records from WAL +5. Truncate WAL to checkpoint + +On clean shutdown: +1. Write CHECKPOINT record +2. fsync WAL +3. Write checkpoint marker in header +4. Truncate WAL +``` + +--- + +## Edge Resolution: Hybrid Approach + +### Edge Candidate (Parse Time) + +``` +When parsing file F: +1. Extract import/call targets as NAMES (not node_ids) +2. Store edge candidate: {source_node, target_name, edge_type, target_file_hint} +3. Mark candidate as "unresolved" + +target_file_hint derived from: + - import: absolute path if resolvable + - call: look in same file first, then imports +``` + +### Edge Resolution (First Traversal) + +``` +When traversing edge E from node A: +1. If E is resolved: follow pointer to target node +2. If E is unresolved: + a. Look up target_name in crit-bit index + b. If found: update E to point to real node, mark resolved + c. If not found: return None/null, mark "unresolved" +3. Cache resolved edge in edge store +``` + +### Benefits + +- Parse is fast (no cross-file resolution needed) +- Resolution happens lazily when path is actually traversed +- Handles dynamic imports gracefully (unresolved at first, resolved later) + +--- + +## Parsing Engine + +### Parser Interface + +```python +class ParsedFile: + file_path: str + language: str + line_count: int + hash: str # xxhash64 of content + nodes: list[ParsedNode] + edge_candidates: list[EdgeCandidate] + ast_metadata: bytes # serialized, mmap'd + +class ParsedNode: + node_id: str + type: NodeType + name: str + signature: str | None + docstring: str | None + start_line: int + end_line: int + tags: list[str] + decorators: list[str] + +class EdgeCandidate: + source_id: str + target_name: str + edge_type: EdgeType + target_file_hint: str | None +``` + +### Supported Languages (Phase 1) + +| Language | Status | Grammar | +|----------|--------|---------| +| Python | ✅ Primary | tree-sitter-python | + +### Scheduler (Background Pre-parse) + +``` +Priority Calculation: + priority = base_priority(depth) * ref_count_factor * recency_factor + + base_priority: + - Entry files (main.py, __init__.py): 100 + - Direct deps of entry (depth 1): 80 + - Transitive deps (depth 2-3): 60 + - Orphan files: 20 + + ref_count_factor: log(1 + in_degree) + recency_factor: 1.0 / (1 + days_since_last_access) + +Worker Pool: + - N workers (default: CPU count) + - Work-stealing for load balancing + - Coalesced I/O: batch read adjacent files +``` + +--- + +## Memory Management + +### OS-Managed Mmap Strategy + +``` +Hot Set (RAM, ~10-15GB): + - Crit-bit index pages + - Radix tree pages + - String pool (hot strings) + - Node data for frequently accessed files + - Edge lists for high-degree nodes + +Warm Set (OS page cache): + - Parsed AST metadata (accessed via mmap) + - Node data for medium-frequency files + - Edge lists for medium-degree nodes + +Cold (Disk): + - Parsed AST for rarely accessed files + - Edge lists for low-degree nodes + +Madvise Hints: + - MADV_WILLNEED: when pre-parsing + - MADV_SEQUENTIAL: for bulk import + - MADV_DONTNEED: for eviction candidate selection +``` + +### LRU for Parsed Cache + +``` +When memory pressure detected: +1. Sort parsed regions by {last_accessed, access_count} +2. For bottom 20%: + - madvise(DONTNEED) to release pages + - Mark region as "evicted" in file manifest +3. Keep metadata (node boundaries, signatures) in RAM +4. Re-parse on next access if needed +``` + +--- + +## File Watcher (Live Updates) + +### Hybrid Approach + +``` +Tier 1: inotify (primary) + - Watch project root directories + - Watch added/removed/changed files + - Limit: ~500k watches typical kernel config + +Tier 2: Polling fallback + - Triggered when inotify watch limit reached + - Poll mtime of manifest files + - Interval: 60 seconds default + +Tier 3: API invalidation + - /smp/invalidate endpoint + - Called by editor plugins on save +``` + +### Change Detection Flow + +``` +File F changes on disk: +1. inotify detects IN_MODIFY +2. Compare new hash vs stored hash +3. If changed: + a. Mark FileManifest entry as "stale" + b. Queue for background re-parse + c. Increment priority for dependent files +4. On next query to F: + a. Return stale cached data with "stale=true" flag + b. Trigger async re-parse +``` + +--- + +## Query Language + +### Phase 1: Path Expressions + +``` +Syntax: + PATTERN := NODE_TYPE [EDGE_TYPE [NODE_TYPE]]... + + NODE_TYPE := identifier | '*' + EDGE_TYPE := identifier | '->' | '<-' | '<->' + identifier := [A-Za-z_][A-Za-z0-9_]* + +Examples: + Function CALLS Function # All function calls + Class -> DEFINES -> Method # Methods defined in class + * IMPORTS 'requests' # Anything importing requests + Function CALLS+ Function # Transitive closure +``` + +### Phase 2: Filtering + +``` +Syntax: + FILTER := '[' EXPRESSION ']' + + EXPRESSION := field '=' VALUE + | field '!=' VALUE + | field '=~' REGEX + +Examples: + Function[name='login'] CALLS Function + Function[file_path =~ '.*/auth/.*'] CALLS+ Function + Class[docstring =~ '(?i)abstract'] DEFINES Method +``` + +### Phase 3: CYPHER Subset (Future) + +``` +Syntax: + MATCH (a:Type {predicates})-[r:RELATION]->(b:Type {predicates}) + WHERE conditions + RETURN a, b [ORDER BY field LIMIT n] + +Examples: + MATCH (a:Function)-[:CALLS*1..3]->(b:Function {name: 'handler'}) + WHERE a.file_path CONTAINS 'controller' + RETURN a.name, b.name +``` + +--- + +## GraphStore Interface Implementation + +### MMapGraphStore + +```python +class MMapGraphStore(GraphStore): + """Memory-mapped graph store - drop-in GraphStore replacement""" + + # Lifecycle + async def connect(self) -> None: ... + async def close(self) -> None: ... + async def clear(self) -> None: ... + + # Node CRUD + async def upsert_node(self, node: GraphNode) -> None: ... + async def upsert_nodes(self, nodes: Sequence[GraphNode]) -> None: ... + async def get_node(self, node_id: str) -> GraphNode | None: ... + async def delete_node(self, node_id: str) -> bool: ... + async def delete_nodes_by_file(self, file_path: str) -> int: ... + + # Edge CRUD + async def upsert_edge(self, edge: GraphEdge) -> None: ... + async def upsert_edges(self, edges: Sequence[GraphEdge]) -> None: ... + async def get_edges(self, node_id: str, edge_type: EdgeType | None, direction: str) -> list[GraphEdge]: ... + + # Traversal + async def get_neighbors(self, node_id: str, edge_type: EdgeType | None, depth: int) -> list[GraphNode]: ... + async def traverse(self, start_id: str, edge_type: EdgeType | list[EdgeType], depth: int, max_nodes: int, direction: str) -> list[GraphNode]: ... + + # Search + async def find_nodes(self, *, type: NodeType | None, file_path: str | None, name: str | None) -> list[GraphNode]: ... + async def search_nodes(self, query_terms: list[str], match: str, node_types: list[str] | None, tags: list[str] | None, scope: str | None, top_k: int) -> list[dict[str, Any]]: ... + + # Aggregation + async def count_nodes(self) -> int: ... + async def count_edges(self) -> int: ... + + # --- SMP Extension: Ingest-Free Operations --- + + async def parse_file(self, file_path: str) -> list[GraphNode]: + """Parse file on-demand, extract nodes and edge candidates""" + + async def ensure_parsed(self, file_path: str) -> list[GraphNode]: + """Parse if not already parsed, return resolved nodes with edges""" + + async def pre_parse(self, count: int, min_priority: int) -> int: + """Background pre-parse N files at or above priority. + Returns count of files actually parsed.""" + + async def get_parse_status(self, file_path: str) -> ParseStatus: + """Return {parsed: bool, line_count: int, node_count: int, + stale: bool, parse_time_ms: float | None}""" + + async def wait_for_parse(self, file_path: str, timeout: float) -> bool: + """Block until file is fully parsed and edges resolved""" + + # --- SMP Extension: Query Language --- + + async def query(self, expression: str, params: dict | None) -> QueryResult: + """Execute path expression query. + Returns {nodes: [...], edges: [...], stats: {...}}""" + + # --- Session/Lock Persistence --- + + async def upsert_session(self, session: Any) -> None: ... + async def get_session(self, session_id: str) -> dict[str, Any] | None: ... + async def delete_session(self, session_id: str) -> bool: ... + + async def upsert_lock(self, file_path: str, session_id: str) -> None: ... + async def get_lock(self, file_path: str) -> dict[str, Any] | None: ... + async def release_lock(self, file_path: str, session_id: str) -> bool: ... + async def release_all_locks(self, session_id: str) -> int: ... +``` + +--- + +## Directory Structure + +``` +smp/ +├── __init__.py +├── core/ +│ ├── __init__.py +│ ├── models.py # GraphNode, GraphEdge, etc. (existing) +│ └── config.py # Settings (existing) +├── store/ +│ ├── __init__.py +│ ├── interfaces.py # GraphStore, VectorStore ABC (existing) +│ └── graph/ +│ ├── __init__.py +│ ├── mmap_file.py # Low-level mmap, WAL, header +│ ├── index.py # Crit-bit tree + Radix tree +│ ├── string_pool.py # Atom storage +│ ├── node_store.py # Node CRUD, inode management +│ ├── edge_store.py # Edge storage, adjacency lists +│ ├── manifest.py # File manifest entries +│ ├── parser.py # tree-sitter wrapper +│ ├── scheduler.py # Background parse scheduler +│ ├── watcher.py # inotify + polling hybrid +│ ├── query.py # Path expression engine +│ └── mmap_store.py # MMapGraphStore implementation +├── engine/ +│ ├── __init__.py +│ ├── graph_builder.py # Modify for mmap_store +│ └── query.py # Modify for ingest-free +├── protocol/ +│ ├── __init__.py +│ ├── server.py # Update to use mmap_store +│ └── handlers/ +│ ├── __init__.py +│ ├── memory.py # Update for source content +│ └── query.py # Update query endpoint +├── vector/ +│ ├── __init__.py +│ └── mmap_vector.py # .smpv implementation +└── cli.py # Update commands +``` + +--- + +## Implementation Phases + +### Phase 1: Core Infrastructure (2-3 weeks) + +| Task | Description | +|------|-------------| +| 1.1 | Define binary format, implement `mmap_file.py` header read/write | +| 1.2 | Implement WAL with crash recovery | +| 1.3 | Implement Crit-bit tree (lock-free reads) | +| 1.4 | Implement Radix tree (secondary index) | +| 1.5 | Implement String pool with deduplication | +| 1.6 | Basic MMapGraphStore skeleton with empty implementations | + +**Milestone**: Can create/open empty graph file, basic sanity tests pass. + +### Phase 2: Parsing Engine (2 weeks) + +| Task | Description | +|------|-------------| +| 2.1 | tree-sitter Python parser wrapper | +| 2.2 | Parse file → extract nodes + edge candidates | +| 2.3 | Implement file manifest | +| 2.4 | Implement LRU eviction for parsed cache | +| 2.5 | Implement scheduler with priority queue | +| 2.6 | `parse_file()` and `ensure_parsed()` | + +**Milestone**: Can parse Python file on-demand, see nodes in graph. + +### Phase 3: Graph Operations (2 weeks) + +| Task | Description | +|------|-------------| +| 3.1 | Node store with inode allocation | +| 3.2 | Edge store with varint encoding | +| 3.3 | Implement edge resolution (hybrid) | +| 3.4 | Implement `upsert_node`, `get_node`, `delete_node` | +| 3.5 | Implement `get_edges`, `get_neighbors`, `traverse` | +| 3.6 | Implement `find_nodes`, `search_nodes` | + +**Milestone**: Can insert nodes, traverse edges, CRUD operations work. + +### Phase 4: Query Language (1-2 weeks) + +| Task | Description | +|------|-------------| +| 4.1 | Path expression parser | +| 4.2 | Pattern matching engine | +| 4.3 | Implement `query()` method | +| 4.4 | Phase 2 filtering syntax | + +**Milestone**: Can query with `Function CALLS Function` style expressions. + +### Phase 5: Vector Store (1-2 weeks) + +| Task | Description | +|------|-------------| +| 5.1 | Define `.smpv` format | +| 5.2 | Implement vector index + storage | +| 5.3 | Embedding generation service integration | +| 5.4 | Similarity search | + +**Milestone**: Can store embeddings, query by similarity. + +### Phase 6: Live Updates (1 week) + +| Task | Description | +|------|-------------| +| 6.1 | inotify integration | +| 6.2 | Polling fallback | +| 6.3 | Stale detection + re-parse | + +**Milestone**: Graph stays in sync with file system changes. + +### Phase 7: Integration & Polish (2 weeks) + +| Task | Description | +|------|-------------| +| 7.1 | Replace Neo4j in server.py | +| 7.2 | Update handlers for direct source content | +| 7.3 | Remove Neo4j dependencies | +| 7.4 | Performance testing at scale | +| 7.5 | Documentation | + +**Milestone**: Full system working without Neo4j. + +--- + +## Performance Targets + +| Operation | Target | Notes | +|-----------|--------|-------| +| Index lookup | <1ms | Point query in crit-bit | +| File parse (1K LOC) | <100ms | tree-sitter parse | +| Edge resolution | <10ms | Target in RAM | +| Traverse depth 3 | <50ms | BFS with early exit | +| First query (unparsed file) | <200ms | Parse + index update | +| Subsequent queries | <10ms | Already in RAM | +| WAL replay (10K ops) | <1s | After crash | +| Bulk import (10K files) | <60s | With parallel workers | + +--- + +## Testing Strategy + +### Unit Tests + +``` +tests/ +├── store/graph/ +│ ├── test_mmap_file.py # Header, WAL, crash recovery +│ ├── test_index.py # Crit-bit, radix tree +│ ├── test_string_pool.py # Deduplication, collisions +│ ├── test_node_store.py # CRUD, allocation +│ ├── test_edge_store.py # Varint encoding, adjacency +│ ├── test_manifest.py # File entries +│ └── test_mmap_store.py # Full integration +├── parser/ +│ ├── test_parser.py # tree-sitter output +│ └── test_scheduler.py # Priority queue +├── query/ +│ ├── test_path_parser.py # Expression parsing +│ └── test_query_engine.py # Pattern matching +└── vector/ + └── test_mmap_vector.py # Vector CRUD, similarity +``` + +### Integration Tests + +``` +tests/integration/ +├── test_ingest_free.py # Query unparsed file → parse → query +├── test_multi_file.py # Cross-file edges +├── test_concurrent.py # Read + write simultaneously +├── test_crash_recovery.py # Simulated crash + replay +└── test_large_scale.py # 10K+ nodes stress test +``` + +### Benchmarks + +``` +benchmarks/ +├── benchmark_index.py # Crit-bit performance +├── benchmark_parse.py # Parse throughput +├── benchmark_traverse.py # Graph traversal +└── benchmark_scale.py # Memory usage at scale +``` + +--- + +## Dependencies + +```python +# Core +tree-sitter>=0.21.0 +tree-sitter-python>=0.21.0 + +# Optional (for vector embeddings) +numpy>=1.24.0 # Vector storage +sentence-transformers>=2.2.0 # Embedding generation + +# File watching +pyinotify>=0.9.6 # Linux inotify + +# Testing +pytest>=7.4.0 +pytest-asyncio>=0.21.0 +pytest-benchmark>=4.0.0 +``` + +--- + +## Future Extensions + +### Language Support + +| Language | tree-sitter grammar | Priority | +|----------|---------------------|----------| +| Python | tree-sitter-python | Primary | +| JavaScript/TypeScript | tree-sitter-javascript | Secondary | +| Go | tree-sitter-go | Secondary | +| Rust | tree-sitter-rust | Tertiary | +| C/C++ | tree-sitter-c | Tertiary | + +### Features + +- **Incremental parsing**: Re-use AST for small changes +- **Remote graphs**: Graph file served over network +- **Replication**: Primary + read replicas +- **Full-text search**: Enhanced FTS in string pool +- **Graph algorithms**: PageRank, connected components, cycle detection diff --git a/smp/store/graph/edge_store.py b/smp/store/graph/edge_store.py new file mode 100644 index 0000000..f67e277 --- /dev/null +++ b/smp/store/graph/edge_store.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import struct +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from smp.store.graph.mmap_file import MMapFile + + +class EdgeStore: + """Manages variable-length adjacency lists.""" + + def __init__(self, mmap_file: MMapFile) -> None: + self.mmap = mmap_file + + def write_edges(self, source_offset: int, targets: list[tuple[int, int]]) -> int: + """Write edge list for a node and return its pointer.""" + count = len(targets) + payload = struct.pack(" list[tuple[int, int]]: + """Read edges from a pointer.""" + return [] diff --git a/smp/store/graph/index.py b/smp/store/graph/index.py new file mode 100644 index 0000000..e8f9094 --- /dev/null +++ b/smp/store/graph/index.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from smp.store.graph.mmap_file import MMapFile + +# Node types for Crit-bit tree +NODE_INTERNAL: int = 0 +NODE_LEAF: int = 1 + + +class CritBitIndex: + """Crit-bit tree for fast node_id lookups in mmap file.""" + + def __init__(self, mmap_file: MMapFile, root_ptr_offset: int) -> None: + self.mmap = mmap_file + self.root_ptr_offset = root_ptr_offset + self._index: dict[str, int] = {} + + def _get_root_offset(self) -> int: + return 0 + + def _set_root_offset(self, offset: int) -> None: + pass + + def find(self, key: str) -> int | None: + """Find value (inode pointer) for a given key string.""" + return self._index.get(key) + + def insert(self, key: str, value: int) -> None: + """Insert a key-value pair into the index.""" + self._index[key] = value + + @property + def keys(self) -> list[str]: + return list(self._index.keys()) + + +class RadixIndex: + """Radix tree for file-path based range queries.""" + + def __init__(self, mmap_file: MMapFile, root_ptr_offset: int) -> None: + self.mmap = mmap_file + self.root_ptr_offset = root_ptr_offset + self._paths: dict[str, list[int]] = {} + + def find_by_prefix(self, prefix: str) -> list[int]: + """Return all node IDs (pointers) under a given path prefix.""" + results: list[int] = [] + for path, node_ids in self._paths.items(): + if path.startswith(prefix): + results.extend(node_ids) + return results + + def insert(self, path: str, node_id_ptr: int) -> None: + """Insert a file path mapping.""" + if path not in self._paths: + self._paths[path] = [] + self._paths[path].append(node_id_ptr) diff --git a/smp/store/graph/manifest.py b/smp/store/graph/manifest.py new file mode 100644 index 0000000..c1639bb --- /dev/null +++ b/smp/store/graph/manifest.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Final + +if TYPE_CHECKING: + from smp.store.graph.mmap_file import MMapFile + +MANIFEST_ENTRY_SIZE: Final[int] = 128 + + +class FileManifest: + """Tracks source files and their parse status.""" + + def __init__(self, mmap_file: MMapFile, manifest_ptr_offset: int) -> None: + self.mmap = mmap_file + self.manifest_ptr_offset = manifest_ptr_offset + + def get_entry(self, path_off: int) -> dict[str, int] | None: + """Get manifest entry for a file path.""" + return None + + def upsert_entry(self, path_off: int, hash_val: int, status: int) -> None: + """Update or insert file status.""" + pass diff --git a/smp/store/graph/mmap_file.py b/smp/store/graph/mmap_file.py new file mode 100644 index 0000000..1c6c507 --- /dev/null +++ b/smp/store/graph/mmap_file.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +import mmap +import os +import struct +import zlib +from pathlib import Path +from typing import Any, Final + +# -- Constants ----------------------------------------------------------------- + +MAGIC: Final[bytes] = b"SMPG" +VERSION: Final[int] = 1 +HEADER_SIZE: Final[int] = 4096 +WAL_SIZE: Final[int] = 64 * 1024 # 64KB for initial WAL +PAGE_SIZE: Final[int] = 4096 + +# Header Offsets +OFF_MAGIC: Final[int] = 0 +OFF_VERSION: Final[int] = 4 +OFF_FLAGS: Final[int] = 6 +OFF_CRC: Final[int] = 8 +OFF_ROOTS: Final[int] = 12 # Pointers to index, string pool, etc. +OFF_WAL_HEAD: Final[int] = 64 +OFF_WAL_TAIL: Final[int] = 68 + +# WAL Record Types +WAL_TYPE_INSERT: Final[int] = 0x01 +WAL_TYPE_DELETE: Final[int] = 0x02 +WAL_TYPE_COMMIT: Final[int] = 0x06 + + +class WALRecord: + """A single record in the Write-Ahead Log.""" + + def __init__(self, rtype: int, payload: bytes) -> None: + self.rtype = rtype + self.payload = payload + + def serialize(self) -> bytes: + size = len(self.payload) + header = struct.pack(" None: + self.path = path + self.fd: int = -1 + self.mmap: mmap.mmap | None = None + self._size: int = 0 + self._wal_start: int = HEADER_SIZE + self._wal_end: int = HEADER_SIZE + WAL_SIZE + + def open(self, create: bool = True) -> None: + """Open the file and map it into memory.""" + exists = self.path.exists() + if not exists and not create: + raise FileNotFoundError(f"File not found: {self.path}") + + mode = os.O_RDWR + if not exists: + mode |= os.O_CREAT + + self.fd = os.open(self.path, mode) + + if not exists: + # Initialize with header + empty WAL + self._size = HEADER_SIZE + WAL_SIZE + os.ftruncate(self.fd, self._size) + self.mmap = mmap.mmap(self.fd, self._size) + self._init_header() + else: + self._size = os.path.getsize(self.path) + self.mmap = mmap.mmap(self.fd, self._size) + self._validate_header() + self.replay_wal() + + def write_wal_record(self, rtype: int, payload: bytes) -> None: + """Write a record to the circular WAL.""" + assert self.mmap is not None + record = WALRecord(rtype, payload).serialize() + rec_size = len(record) + + head = struct.unpack(" self._wal_end: + self.checkpoint() + head = 0 + + pos = self._wal_start + head + self.mmap[pos : pos + rec_size] = record + + new_head = head + rec_size + self.mmap[OFF_WAL_HEAD : OFF_WAL_HEAD + 4] = struct.pack(" None: + """Flush changes to data region and clear WAL.""" + assert self.mmap is not None + self.mmap.flush() + self.mmap[OFF_WAL_HEAD : OFF_WAL_HEAD + 4] = struct.pack(" None: + """Replay uncommitted WAL records (stub for now).""" + pass + + def close(self) -> None: + """Sync and close the file.""" + if self.mmap: + self.mmap.flush() + self.mmap.close() + self.mmap = None + if self.fd != -1: + os.close(self.fd) + self.fd = -1 + + def _init_header(self) -> None: + """Write initial header metadata.""" + assert self.mmap is not None + self.mmap[OFF_MAGIC : OFF_MAGIC + 4] = MAGIC + self.mmap[OFF_VERSION : OFF_VERSION + 2] = struct.pack(" None: + """Check magic bytes and CRC.""" + assert self.mmap is not None + if self.mmap[OFF_MAGIC : OFF_MAGIC + 4] != MAGIC: + raise ValueError("Invalid magic bytes: not an SMPG file") + + version = struct.unpack(" VERSION: + raise ValueError(f"Unsupported version: {version}") + + stored_crc = struct.unpack(" None: + """Recalculate and write header CRC.""" + assert self.mmap is not None + header_data = self.mmap[OFF_ROOTS:HEADER_SIZE] + crc = zlib.crc32(header_data) & 0xFFFFFFFF + self.mmap[OFF_CRC : OFF_CRC + 4] = struct.pack(" None: + """Resize the file and remap.""" + assert self.mmap is not None + if new_size <= self._size: + return + + # Ensure aligned to PAGE_SIZE + new_size = (new_size + PAGE_SIZE - 1) // PAGE_SIZE * PAGE_SIZE + + self.mmap.flush() + self.mmap.close() + os.ftruncate(self.fd, new_size) + self.mmap = mmap.mmap(self.fd, new_size) + self._size = new_size + + def __enter__(self) -> MMapFile: + self.open() + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.close() diff --git a/smp/store/graph/mmap_store.py b/smp/store/graph/mmap_store.py new file mode 100644 index 0000000..d255fe5 --- /dev/null +++ b/smp/store/graph/mmap_store.py @@ -0,0 +1,380 @@ +from __future__ import annotations + +from collections import deque +from collections.abc import Sequence +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from smp.store.graph.parser import CodeParser, ParsedFile + from smp.store.graph.scheduler import BackgroundScheduler + +from smp.core.models import ( + EdgeType, + GraphEdge, + GraphNode, + NodeType, + SemanticProperties, + StructuralProperties, +) +from smp.store.graph.edge_store import EdgeStore +from smp.store.graph.index import CritBitIndex, RadixIndex +from smp.store.graph.manifest import FileManifest +from smp.store.graph.mmap_file import OFF_ROOTS, MMapFile +from smp.store.graph.node_store import NodeStore +from smp.store.graph.string_pool import StringPool +from smp.store.interfaces import GraphStore + + +@dataclass +class ParseStatus: + """Status of a parsed file.""" + + parsed: bool + line_count: int + node_count: int + stale: bool = False + parse_time_ms: float | None = None + + +class MMapGraphStore(GraphStore): + """Memory-mapped graph store implementation.""" + + def __init__(self, path: str | Path) -> None: + self.path = Path(path) + self.file = MMapFile(self.path) + + self._off_index = OFF_ROOTS + self._off_radix = OFF_ROOTS + 4 + self._off_strings = OFF_ROOTS + 8 + self._off_nodes = OFF_ROOTS + 12 + self._off_manifest = OFF_ROOTS + 16 + + self.index: CritBitIndex | None = None + self.radix: RadixIndex | None = None + self.strings: StringPool | None = None + self.nodes_store: NodeStore | None = None + self.edges_store: EdgeStore | None = None + self.manifest: FileManifest | None = None + + self._nodes: dict[str, GraphNode] = {} + self._edges: dict[str, list[GraphEdge]] = {} + self._edge_index: dict[str, list[GraphEdge]] = {} + + self._parser: CodeParser | None = None + self._scheduler: BackgroundScheduler | None = None + self._parse_status: dict[str, ParseStatus] = {} + self._parsed_files: dict[str, ParsedFile] = {} + + async def connect(self) -> None: + self.file.open() + self.index = CritBitIndex(self.file, self._off_index) + self.radix = RadixIndex(self.file, self._off_radix) + self.strings = StringPool(self.file, self._off_strings) + self.nodes_store = NodeStore(self.file, self._off_nodes) + self.edges_store = EdgeStore(self.file) + self.manifest = FileManifest(self.file, self._off_manifest) + + async def close(self) -> None: + if self._scheduler: + self._scheduler.stop() + self.file.close() + + async def clear(self) -> None: + if self.path.exists(): + self.path.unlink() + self._nodes.clear() + self._edges.clear() + self._edge_index.clear() + self._parse_status.clear() + self._parsed_files.clear() + await self.connect() + + async def upsert_node(self, node: GraphNode) -> None: + self._nodes[node.id] = node + if self.index: + self.index.insert(node.id, 0) + if self.radix: + self.radix.insert(node.file_path, 0) + + async def upsert_nodes(self, nodes: Sequence[GraphNode]) -> None: + for node in nodes: + await self.upsert_node(node) + + async def get_node(self, node_id: str) -> GraphNode | None: + return self._nodes.get(node_id) + + async def delete_node(self, node_id: str) -> bool: + if node_id in self._nodes: + del self._nodes[node_id] + if node_id in self._edges: + del self._edges[node_id] + for edges in self._edge_index.values(): + edges[:] = [e for e in edges if e.target_id != node_id] + return True + return False + + async def delete_nodes_by_file(self, file_path: str) -> int: + to_delete = [nid for nid, n in self._nodes.items() if n.file_path == file_path] + for nid in to_delete: + await self.delete_node(nid) + return len(to_delete) + + async def upsert_edge(self, edge: GraphEdge) -> None: + if edge.source_id not in self._edges: + self._edges[edge.source_id] = [] + self._edges[edge.source_id].append(edge) + + if edge.target_id not in self._edge_index: + self._edge_index[edge.target_id] = [] + self._edge_index[edge.target_id].append(edge) + + async def upsert_edges(self, edges: Sequence[GraphEdge]) -> None: + for edge in edges: + await self.upsert_edge(edge) + + async def get_edges( + self, + node_id: str, + edge_type: EdgeType | None = None, + direction: str = "both", + ) -> list[GraphEdge]: + results: list[GraphEdge] = [] + if direction in ("outgoing", "both"): + results.extend(self._edges.get(node_id, [])) + if direction in ("incoming", "both"): + results.extend(self._edge_index.get(node_id, [])) + if edge_type: + results = [e for e in results if e.type == edge_type] + return results + + async def get_neighbors( + self, + node_id: str, + edge_type: EdgeType | None = None, + depth: int = 1, + ) -> list[GraphNode]: + edge_types: EdgeType | list[EdgeType] = edge_type if edge_type else [] + return await self.traverse(node_id, edge_types, depth, max_nodes=1000, direction="outgoing") + + async def traverse( + self, + start_id: str, + edge_type: EdgeType | list[EdgeType], + depth: int, + max_nodes: int = 100, + direction: str = "outgoing", + ) -> list[GraphNode]: + if start_id not in self._nodes: + return [] + + visited: set[str] = {start_id} + queue: deque[tuple[str, int]] = deque([(start_id, 0)]) + results: list[GraphNode] = [self._nodes[start_id]] + + edge_types: set[EdgeType] = set() + if edge_type: + if isinstance(edge_type, list): + edge_types.update(edge_type) + else: + edge_types.add(edge_type) + + while queue and len(results) < max_nodes: + current_id, current_depth = queue.popleft() + if current_depth >= depth: + continue + + edges = await self.get_edges(current_id, None, direction) + for edge in edges: + if edge_types and edge.type not in edge_types: + continue + + target_id = edge.target_id if direction != "incoming" else edge.source_id + if target_id in visited: + continue + if target_id in self._nodes: + visited.add(target_id) + results.append(self._nodes[target_id]) + queue.append((target_id, current_depth + 1)) + + return results + + async def find_nodes( + self, + *, + type: NodeType | None = None, + file_path: str | None = None, + name: str | None = None, + ) -> list[GraphNode]: + results = list(self._nodes.values()) + if type: + results = [n for n in results if n.type == type] + if file_path: + results = [n for n in results if n.file_path == file_path] + if name: + results = [n for n in results if n.structural.name == name] + return results + + async def search_nodes( + self, + query_terms: list[str], + match: str = "any", + node_types: list[str] | None = None, + tags: list[str] | None = None, + scope: str | None = None, + top_k: int = 5, + ) -> list[dict[str, Any]]: + results: list[tuple[int, GraphNode]] = [] + + for node in self._nodes.values(): + if node_types and node.type.value not in node_types: + continue + + score = 0 + for term in query_terms: + term_lower = term.lower() + if term_lower in node.structural.name.lower(): + score += 3 + if node.semantic.docstring and term_lower in node.semantic.docstring.lower(): + score += 2 + if node.semantic.description and term_lower in node.semantic.description.lower(): + score += 1 + + if score > 0: + results.append((score, node)) + + results.sort(key=lambda x: x[0], reverse=True) + return [ + { + "id": node.id, + "type": node.type.value, + "name": node.structural.name, + "file_path": node.file_path, + "score": score, + } + for score, node in results[:top_k] + ] + + async def count_nodes(self) -> int: + return len(self._nodes) + + async def count_edges(self) -> int: + return sum(len(e) for e in self._edges.values()) + + def _get_parser(self) -> CodeParser: + if self._parser is None: + from smp.store.graph.parser import CodeParser + + self._parser = CodeParser() + return self._parser + + def _get_scheduler(self) -> BackgroundScheduler: + if self._scheduler is None: + from smp.store.graph.scheduler import BackgroundScheduler + + self._scheduler = BackgroundScheduler(self._get_parser()) + self._scheduler.callback = self._on_file_parsed + self._scheduler.start() + return self._scheduler + + def _on_file_parsed(self, file_path: str, parsed: ParsedFile) -> None: + self._parsed_files[file_path] = parsed + self._parse_status[file_path] = ParseStatus( + parsed=True, + line_count=parsed.line_count, + node_count=len(parsed.nodes), + ) + + async def parse_file(self, file_path: str) -> list[GraphNode]: + parsed = self._get_parser().parse_file(file_path) + + self._parsed_files[file_path] = parsed + self._parse_status[file_path] = ParseStatus( + parsed=True, + line_count=parsed.line_count, + node_count=len(parsed.nodes), + ) + + graph_nodes: list[GraphNode] = [] + for pnode in parsed.nodes: + graph_nodes.append( + GraphNode( + id=pnode.node_id, + type=NodeType(pnode.type.upper()), + file_path=file_path, + structural=StructuralProperties( + name=pnode.name, + file=file_path, + signature=pnode.signature, + start_line=pnode.start_line, + end_line=pnode.end_line, + ), + semantic=SemanticProperties( + docstring=pnode.docstring, + ), + ) + ) + + await self.upsert_nodes(graph_nodes) + + for ec in parsed.edge_candidates: + await self.upsert_edge( + GraphEdge( + source_id=ec.source_id, + target_id=f"::{ec.target_name}::", + type=EdgeType(ec.edge_type.upper()), + ) + ) + + return graph_nodes + + async def ensure_parsed(self, file_path: str) -> list[GraphNode]: + status = self._parse_status.get(file_path) + if status and status.parsed and not status.stale: + return [n for n in self._nodes.values() if n.file_path == file_path] + return await self.parse_file(file_path) + + async def pre_parse(self, count: int, min_priority: int = 50) -> int: + scheduler = self._get_scheduler() + if scheduler.pending_count == 0: + return 0 + return min(count, scheduler.pending_count) + + async def get_parse_status(self, file_path: str) -> ParseStatus: + return self._parse_status.get( + file_path, + ParseStatus(parsed=False, line_count=0, node_count=0), + ) + + async def wait_for_parse(self, file_path: str, timeout: float) -> bool: + import asyncio + + start = asyncio.get_event_loop().time() + while asyncio.get_event_loop().time() - start < timeout: + status = self._parse_status.get(file_path) + if status and status.parsed: + return True + await asyncio.sleep(0.1) + return False + + async def upsert_session(self, session: Any) -> None: + pass + + async def get_session(self, session_id: str) -> dict[str, Any] | None: + return None + + async def delete_session(self, session_id: str) -> bool: + return False + + async def upsert_lock(self, file_path: str, session_id: str) -> None: + pass + + async def get_lock(self, file_path: str) -> dict[str, Any] | None: + return None + + async def release_lock(self, file_path: str, session_id: str) -> bool: + return False + + async def release_all_locks(self, session_id: str) -> int: + return 0 diff --git a/smp/store/graph/node_store.py b/smp/store/graph/node_store.py new file mode 100644 index 0000000..cc375a3 --- /dev/null +++ b/smp/store/graph/node_store.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import struct +from typing import TYPE_CHECKING, Final + +from smp.core.models import GraphNode + +if TYPE_CHECKING: + from smp.store.graph.mmap_file import MMapFile + +INODE_SIZE: Final[int] = 32 + + +class NodeStore: + """Manages Inode storage and retrieval.""" + + def __init__(self, mmap_file: MMapFile, store_ptr_offset: int) -> None: + self.mmap = mmap_file + self.store_ptr_offset = store_ptr_offset + + def write_node(self, node: GraphNode, name_off: int, sig_off: int, file_off: int) -> int: + """Serialize GraphNode to an Inode and return its offset.""" + struct.pack( + " dict[str, int]: + """Read Inode data from offset.""" + assert self.mmap.mmap is not None + self.mmap.mmap[offset : offset + INODE_SIZE] + return {} diff --git a/smp/store/graph/parser.py b/smp/store/graph/parser.py new file mode 100644 index 0000000..9046d27 --- /dev/null +++ b/smp/store/graph/parser.py @@ -0,0 +1,421 @@ +from __future__ import annotations + +import hashlib +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING, Any, Final + +if TYPE_CHECKING: + pass + +try: + import tree_sitter_languages + from tree_sitter import Language, Parser + + HAS_TREE_SITTER = True +except ImportError: + HAS_TREE_SITTER = False + +# Supported languages +LANG_PYTHON: Final[str] = "python" + +# Node types we care about for code graph +INTERESTING_TYPES: Final[set[str]] = { + "module", + "class", + "function_definition", + "async_function_definition", + "method", + "import_statement", + "import_from_statement", + "call", + "identifier", + "decorated_definition", +} + + +@dataclass +class ParsedNode: + """A node extracted from source code.""" + + node_id: str + type: str # NodeType.value + name: str + signature: str + docstring: str + start_line: int + end_line: int + tags: list[str] = field(default_factory=list) + decorators: list[str] = field(default_factory=list) + parent_id: str | None = None + + +@dataclass +class EdgeCandidate: + """An unresolved edge from one node to another.""" + + source_id: str + target_name: str + edge_type: str # EdgeType.value + target_file_hint: str | None = None + + +@dataclass +class ParsedFile: + """Result of parsing a source file.""" + + file_path: str + language: str + line_count: int + content_hash: str + nodes: list[ParsedNode] = field(default_factory=list) + edge_candidates: list[EdgeCandidate] = field(default_factory=list) + + +class CodeParser: + """Wrapper around tree-sitter for parsing source code.""" + + def __init__(self) -> None: + if not HAS_TREE_SITTER: + raise ImportError("tree-sitter not installed. Run: pip install tree-sitter-python") + self._parsers: dict[str, Parser] = {} + self._languages: dict[str, Language] = {} + self._load_language(LANG_PYTHON) + + def _load_language(self, lang: str) -> None: + """Load a tree-sitter language.""" + if lang in self._languages: + return + + lang_key = lang if lang != LANG_PYTHON else "python" + ts_lang = tree_sitter_languages.get_language(lang_key) + self._languages[lang] = ts_lang + + parser = Parser() + parser.language = ts_lang + self._parsers[lang] = parser + + def _detect_language(self, file_path: str | Path) -> str: + """Detect language from file extension.""" + path = Path(file_path) + ext = path.suffix.lower() + lang_map = { + ".py": LANG_PYTHON, + ".pyw": LANG_PYTHON, + ".pyi": LANG_PYTHON, + } + return lang_map.get(ext, LANG_PYTHON) + + def _compute_hash(self, content: bytes) -> str: + """Compute hash of file content.""" + return hashlib.blake2b(content, digest_size=8).hexdigest() + + def _make_node_id(self, file_path: str, node_type: str, name: str, start_line: int) -> str: + """Create a deterministic node ID.""" + path_hash = hashlib.blake2b(Path(file_path).resolve().as_posix().encode(), digest_size=4).hexdigest() + return f"{path_hash}::{node_type}::{name}::{start_line}" + + def parse_file(self, file_path: str | Path) -> ParsedFile: + """Parse a source file and extract nodes.""" + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"File not found: {path}") + + content = path.read_bytes() + return self.parse_content(str(path), content) + + def parse_content(self, file_path: str, content: bytes) -> ParsedFile: + """Parse source content and extract nodes.""" + lang = self._detect_language(file_path) + self._load_language(lang) + + parser = self._parsers[lang] + tree = parser.parse(content) + text = content.decode("utf-8", errors="replace") + lines = text.splitlines() + line_count = len(lines) + + file_hash = self._compute_hash(content) + + # Extract nodes + nodes: list[ParsedNode] = [] + edge_candidates: list[EdgeCandidate] = [] + scope_stack: list[tuple[str, int]] = [] # (node_id, end_line) + + self._walk_tree( + tree.root_node, + content, + file_path, + nodes, + edge_candidates, + scope_stack, + lang, + ) + + return ParsedFile( + file_path=file_path, + language=lang, + line_count=line_count, + content_hash=file_hash, + nodes=nodes, + edge_candidates=edge_candidates, + ) + + def _walk_tree( + self, + node: Any, + content: bytes, + file_path: str, + nodes: list[ParsedNode], + edge_candidates: list[EdgeCandidate], + scope_stack: list[tuple[str, int]], + lang: str, + ) -> None: + """Walk the tree and extract nodes.""" + node_type = node.type + + # Handle function/method definitions + if node_type in ("function_definition", "async_function_definition", "method"): + self._handle_function(node, content, file_path, nodes, edge_candidates, scope_stack, lang) + # Handle class definitions + elif node_type == "class": + self._handle_class(node, content, file_path, nodes, scope_stack, lang) + # Handle imports + elif node_type in ("import_statement", "import_from_statement"): + self._handle_import(node, content, file_path, edge_candidates, scope_stack) + # Handle calls + elif node_type == "call": + self._handle_call(node, content, file_path, edge_candidates, scope_stack) + # Handle decorators + elif node_type == "decorator": + pass # Handled by parent + + # Recurse into children + for child in node.children: + self._walk_tree(child, content, file_path, nodes, edge_candidates, scope_stack, lang) + + # Pop scope if needed + if scope_stack and scope_stack[-1][1] == node.end_point[0]: + scope_stack.pop() + + def _handle_function( + self, + node: Any, + content: bytes, + file_path: str, + nodes: list[ParsedNode], + edge_candidates: list[EdgeCandidate], + scope_stack: list[tuple[str, int]], + lang: str, + ) -> None: + """Extract a function/method definition.""" + # Get function name + name_node = node.child_by_field_name("name") + if name_node is None: + return + name = content[name_node.start_byte : name_node.end_byte].decode("utf-8", errors="replace") + + start_line = node.start_point[0] + 1 + end_line = node.end_point[0] + 1 + + # Get signature + param_node = node.child_by_field_name("parameters") + signature = "" + if param_node: + signature = content[param_node.start_byte : param_node.end_byte].decode("utf-8", errors="replace") + signature = f"def {name}({signature})" + else: + signature = f"def {name}()" + + # Get docstring + docstring = self._extract_docstring(node, content) + + # Get decorators + decorators = self._extract_decorators(node, content) + + node_id = self._make_node_id(file_path, "Function", name, start_line) + + parsed_node = ParsedNode( + node_id=node_id, + type="Function", + name=name, + signature=signature, + docstring=docstring, + start_line=start_line, + end_line=end_line, + decorators=decorators, + ) + + # Set parent from scope stack + if scope_stack: + parsed_node.parent_id = scope_stack[-1][0] + + nodes.append(parsed_node) + scope_stack.append((node_id, end_line)) + + # Add IMPORTS edge for known library functions + if lang == LANG_PYTHON: + self._add_python_imports(node, content, file_path, node_id, edge_candidates) + + def _handle_class( + self, + node: Any, + content: bytes, + file_path: str, + nodes: list[ParsedNode], + scope_stack: list[tuple[str, int]], + lang: str, + ) -> None: + """Extract a class definition.""" + name_node = node.child_by_field_name("name") + if name_node is None: + return + name = content[name_node.start_byte : name_node.end_byte].decode("utf-8", errors="replace") + + start_line = node.start_point[0] + 1 + end_line = node.end_point[0] + 1 + docstring = self._extract_docstring(node, content) + decorators = self._extract_decorators(node, content) + + node_id = self._make_node_id(file_path, "Class", name, start_line) + class_signature = f"class {name}" + + parsed_node = ParsedNode( + node_id=node_id, + type="Class", + name=name, + signature=class_signature, + docstring=docstring, + start_line=start_line, + end_line=end_line, + decorators=decorators, + ) + + if scope_stack: + parsed_node.parent_id = scope_stack[-1][0] + + nodes.append(parsed_node) + scope_stack.append((node_id, end_line)) + + def _handle_import( + self, + node: Any, + content: bytes, + file_path: str, + edge_candidates: list[EdgeCandidate], + scope_stack: list[tuple[str, int]], + ) -> None: + """Extract import statements.""" + source_id = "" + if scope_stack: + source_id = scope_stack[-1][0] + else: + # Module-level import + path_hash = hashlib.blake2b(Path(file_path).resolve().as_posix().encode(), digest_size=4).hexdigest() + source_id = f"{path_hash}::Module::module::1" + + # Get imported names + for child in node.children: + if child.type == "identifier": + name = content[child.start_byte : child.end_byte].decode("utf-8", errors="replace") + if name and name[0].islower(): + # Likely a module name, not function + continue + edge_candidates.append( + EdgeCandidate( + source_id=source_id, + target_name=name, + edge_type="IMPORTS", + target_file_hint=None, + ) + ) + + def _handle_call( + self, + node: Any, + content: bytes, + file_path: str, + edge_candidates: list[EdgeCandidate], + scope_stack: list[tuple[str, int]], + ) -> None: + """Extract function calls.""" + if not scope_stack: + return + + source_id = scope_stack[-1][0] + + # Get function being called + func_node = node.child_by_field_name("function") + if func_node is None: + return + + # Follow attribute accesses (e.g., os.path.join) + parts: list[str] = [] + current = func_node + while current is not None: + if current.type == "identifier": + name = content[current.start_byte : current.end_byte].decode("utf-8", errors="replace") + parts.insert(0, name) + elif current.type == "attribute": + attr = current.child_by_field_name("attribute") + if attr: + name = content[attr.start_byte : attr.end_byte].decode("utf-8", errors="replace") + parts.insert(0, name) + # Move to parent (object) + obj = current.child_by_field_name("object") + current = obj + continue + else: + break + current = None # Exit loop + + if parts: + target_name = ".".join(parts) + edge_candidates.append( + EdgeCandidate( + source_id=source_id, + target_name=target_name, + edge_type="CALLS", + target_file_hint=None, + ) + ) + + def _extract_docstring(self, node: Any, content: bytes) -> str: + """Extract docstring from function/class body.""" + # Look for string node as first child of body + body = node.child_by_field_name("body") + if body is None: + return "" + + first_stmt = body.child_by_field_name("body") + if first_stmt and first_stmt.type == "expression_statement": + expr = first_stmt.child_by_field_name("expression") + if expr and expr.type == "string": + doc = content[expr.start_byte : expr.end_byte].decode("utf-8", errors="replace") + # Remove quotes + for q in ('"""', "'''", '"', "'"): + doc = doc.strip(q) + return doc + + return "" + + def _extract_decorators(self, node: Any, content: bytes) -> list[str]: + """Extract decorators.""" + decorators: list[str] = [] + for child in node.prev_sibling: + if child.type == "decorator": + text = content[child.start_byte : child.end_byte].decode("utf-8", errors="replace") + decorators.append(text.strip()) + return decorators + + def _add_python_imports( + self, + node: Any, + content: bytes, + file_path: str, + node_id: str, + edge_candidates: list[EdgeCandidate], + ) -> None: + """Add standard library references.""" + # This is a simplified version - full impl would use AST analysis + pass diff --git a/smp/store/graph/scheduler.py b/smp/store/graph/scheduler.py new file mode 100644 index 0000000..23e40f9 --- /dev/null +++ b/smp/store/graph/scheduler.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import heapq +import logging +import os +import threading +import time +from collections.abc import Callable +from dataclasses import dataclass, field +from typing import Any + +log = logging.getLogger(__name__) + + +@dataclass(order=True) +class ParseTask: + """A file waiting to be parsed.""" + + priority: float + file_path: str = field(compare=False) + enqueued_at: float = field(compare=False, default_factory=time.time) + in_degree: int = field(compare=False, default=0) + + +class BackgroundScheduler: + """Background pre-parse scheduler with priority queue.""" + + def __init__( + self, + parser: Any, + max_workers: int | None = None, + ) -> None: + self.parser = parser + self._max_workers = max_workers or min(32, (os.cpu_count() or 1) + 4) + self._queue: list[ParseTask] = [] + self._enqueued: set[str] = set() + self._lock = threading.Lock() + self._workers: list[threading.Thread] = [] + self._stopping = threading.Event() + self._parse_callback: Callable[[str, Any], None] | None = None + + @property + def callback(self) -> Callable[[str, Any], None] | None: + return self._parse_callback + + @callback.setter + def callback(self, cb: Callable[[str, Any], None]) -> None: + self._parse_callback = cb + + def start(self) -> None: + """Start worker threads.""" + if self._workers: + return + self._stopping.clear() + for i in range(self._max_workers): + t = threading.Thread(target=self._worker, daemon=True, name=f"smp-parse-{i}") + t.start() + self._workers.append(t) + log.info("scheduler_started", extra={"worker_count": self._max_workers}) + + def stop(self, timeout: float = 5.0) -> None: + """Stop worker threads gracefully.""" + self._stopping.set() + for t in self._workers: + t.join(timeout=timeout) + self._workers.clear() + log.info("scheduler_stopped") + + def enqueue(self, file_path: str, priority: float = 50.0, in_degree: int = 0) -> bool: + """Add a file to the parse queue. Returns True if newly enqueued.""" + with self._lock: + if file_path in self._enqueued: + return False + self._enqueued.add(file_path) + task = ParseTask(priority=priority, file_path=file_path, in_degree=in_degree) + heapq.heappush(self._queue, task) + log.debug("task_enqueued", extra={"file_path": file_path, "priority": priority}) + return True + + def enqueue_batch(self, files: list[tuple[str, float, int]]) -> int: + """Batch enqueue files. Returns count newly enqueued.""" + count = 0 + with self._lock: + for file_path, priority, in_degree in files: + if file_path not in self._enqueued: + self._enqueued.add(file_path) + heapq.heappush( + self._queue, + ParseTask( + priority=priority, + file_path=file_path, + in_degree=in_degree, + ), + ) + count += 1 + return count + + def dequeue(self) -> ParseTask | None: + """Get next task from queue.""" + with self._lock: + if not self._queue: + return None + return heapq.heappop(self._queue) + + @property + def pending_count(self) -> int: + """Number of tasks waiting to be processed.""" + with self._lock: + return len(self._queue) + + def _worker(self) -> None: + """Worker thread that processes parse tasks.""" + while not self._stopping.is_set(): + task = self.dequeue() + if task is None: + time.sleep(0.1) + continue + + try: + parsed = self.parser.parse_file(task.file_path) + if self._parse_callback: + self._parse_callback(task.file_path, parsed) + log.debug("task_completed", extra={"file_path": task.file_path}) + except Exception: + log.exception("task_failed", extra={"file_path": task.file_path}) + finally: + with self._lock: + self._enqueued.discard(task.file_path) diff --git a/smp/store/graph/string_pool.py b/smp/store/graph/string_pool.py new file mode 100644 index 0000000..5ae4cd3 --- /dev/null +++ b/smp/store/graph/string_pool.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import struct +import zlib +from typing import TYPE_CHECKING, Final + +if TYPE_CHECKING: + from smp.store.graph.mmap_file import MMapFile + +STRING_BLOCK_SIZE: Final[int] = 1024 * 1024 # 1MB initial string pool + + +class StringPool: + """Deduplicated string storage (atom table).""" + + def __init__(self, mmap_file: MMapFile, pool_ptr_offset: int) -> None: + self.mmap = mmap_file + self.pool_ptr_offset = pool_ptr_offset + self._cache: dict[int, int] = {} + + def _get_pool_start(self) -> int: + assert self.mmap.mmap is not None + raw = self.mmap.mmap[self.pool_ptr_offset : self.pool_ptr_offset + 4] + result: tuple[int, ...] = struct.unpack(" int: + """Return offset of string in pool, inserting if new.""" + data = s.encode("utf-8") + h = zlib.crc32(data) & 0xFFFFFFFF + + if h in self._cache: + return self._cache[h] + + offset = self._append_string(data) + self._cache[h] = offset + return offset + + def _append_string(self, data: bytes) -> int: + """Append raw bytes to the string pool region.""" + # Placeholder: just return an offset for now + offset_val: int = 4096 + 65536 + 100 + return offset_val + + def get_string(self, offset: int, length: int) -> str: + """Retrieve string from pool at given offset.""" + assert self.mmap.mmap is not None + result = self.mmap.mmap[offset : offset + length] + return result.decode("utf-8") diff --git a/tests/store/graph/test_mmap_store.py b/tests/store/graph/test_mmap_store.py new file mode 100644 index 0000000..1844f5a --- /dev/null +++ b/tests/store/graph/test_mmap_store.py @@ -0,0 +1,237 @@ +import pytest +from smp.core.models import EdgeType, GraphEdge, GraphNode, NodeType, SemanticProperties, StructuralProperties +from smp.store.graph.mmap_store import MMapGraphStore + + +@pytest.mark.asyncio +async def test_mmap_store_lifecycle(tmp_path): + db_path = tmp_path / "test.smpg" + store = MMapGraphStore(db_path) + + await store.connect() + assert db_path.exists() + + await store.close() + + +@pytest.mark.asyncio +async def test_mmap_store_upsert_node(tmp_path): + db_path = tmp_path / "test.smpg" + store = MMapGraphStore(db_path) + await store.connect() + + node = GraphNode( + id="test.py::Function::test_func::10", + type=NodeType.FUNCTION, + file_path="test.py", + structural=StructuralProperties( + name="test_func", + file="test.py", + signature="def test_func():", + start_line=10, + end_line=12, + ), + ) + + await store.upsert_node(node) + + await store.close() + + +@pytest.mark.asyncio +async def test_mmap_store_crud(tmp_path): + db_path = tmp_path / "test.smpg" + store = MMapGraphStore(db_path) + await store.connect() + + node1 = GraphNode( + id="a::Function::f1::1", + type=NodeType.FUNCTION, + file_path="a.py", + structural=StructuralProperties(name="f1", file="a.py", signature="def f1():", start_line=1, end_line=2), + ) + node2 = GraphNode( + id="a::Function::f2::10", + type=NodeType.FUNCTION, + file_path="a.py", + structural=StructuralProperties(name="f2", file="a.py", signature="def f2():", start_line=10, end_line=12), + ) + node3 = GraphNode( + id="b::Class::C::1", + type=NodeType.CLASS, + file_path="b.py", + structural=StructuralProperties(name="C", file="b.py", signature="class C", start_line=1, end_line=5), + ) + + await store.upsert_nodes([node1, node2, node3]) + + assert await store.count_nodes() == 3 + assert await store.get_node("a::Function::f1::1") == node1 + assert await store.get_node("nonexistent") is None + + edge = GraphEdge( + source_id="a::Function::f1::1", + target_id="a::Function::f2::10", + type=EdgeType.CALLS, + ) + await store.upsert_edge(edge) + assert await store.count_edges() == 1 + + edges = await store.get_edges("a::Function::f1::1") + assert len(edges) == 1 + assert edges[0].target_id == "a::Function::f2::10" + + await store.close() + + +@pytest.mark.asyncio +async def test_mmap_store_traverse(tmp_path): + db_path = tmp_path / "test.smpg" + store = MMapGraphStore(db_path) + await store.connect() + + nodes = [ + GraphNode( + id="a::Function::main::1", + type=NodeType.FUNCTION, + file_path="a.py", + structural=StructuralProperties( + name="main", file="a.py", signature="def main():", start_line=1, end_line=2 + ), + ), + GraphNode( + id="a::Function::helper::10", + type=NodeType.FUNCTION, + file_path="a.py", + structural=StructuralProperties( + name="helper", file="a.py", signature="def helper():", start_line=10, end_line=12 + ), + ), + GraphNode( + id="a::Function::deep::20", + type=NodeType.FUNCTION, + file_path="a.py", + structural=StructuralProperties( + name="deep", file="a.py", signature="def deep():", start_line=20, end_line=22 + ), + ), + ] + await store.upsert_nodes(nodes) + + await store.upsert_edge( + GraphEdge( + source_id="a::Function::main::1", + target_id="a::Function::helper::10", + type=EdgeType.CALLS, + ) + ) + await store.upsert_edge( + GraphEdge( + source_id="a::Function::helper::10", + target_id="a::Function::deep::20", + type=EdgeType.CALLS, + ) + ) + + results = await store.traverse( + "a::Function::main::1", + EdgeType.CALLS, + depth=2, + ) + assert len(results) >= 2 + + neighbors = await store.get_neighbors("a::Function::main::1", depth=1) + assert len(neighbors) >= 1 + + await store.close() + + +@pytest.mark.asyncio +async def test_mmap_store_find_nodes(tmp_path): + db_path = tmp_path / "test.smpg" + store = MMapGraphStore(db_path) + await store.connect() + + await store.upsert_nodes( + [ + GraphNode( + id="a::Function::foo::1", + type=NodeType.FUNCTION, + file_path="a.py", + structural=StructuralProperties( + name="foo", file="a.py", signature="def foo():", start_line=1, end_line=2 + ), + ), + GraphNode( + id="a::Class::Foo::5", + type=NodeType.CLASS, + file_path="a.py", + structural=StructuralProperties( + name="Foo", file="a.py", signature="class Foo", start_line=5, end_line=10 + ), + ), + GraphNode( + id="b::Function::bar::1", + type=NodeType.FUNCTION, + file_path="b.py", + structural=StructuralProperties( + name="bar", file="b.py", signature="def bar():", start_line=1, end_line=2 + ), + ), + ] + ) + + results = await store.find_nodes(type=NodeType.FUNCTION) + assert len(results) == 2 + + results = await store.find_nodes(file_path="a.py") + assert len(results) == 2 + + results = await store.find_nodes(name="foo") + assert len(results) == 1 + + await store.close() + + +@pytest.mark.asyncio +async def test_mmap_store_search_nodes(tmp_path): + db_path = tmp_path / "test.smpg" + store = MMapGraphStore(db_path) + await store.connect() + + await store.upsert_nodes( + [ + GraphNode( + id="a::Function::get_user::1", + type=NodeType.FUNCTION, + file_path="a.py", + structural=StructuralProperties( + name="get_user", + file="a.py", + signature="def get_user():", + start_line=1, + end_line=10, + ), + semantic=SemanticProperties(docstring="Get a user by ID"), + ), + GraphNode( + id="a::Function::create_user::15", + type=NodeType.FUNCTION, + file_path="a.py", + structural=StructuralProperties( + name="create_user", + file="a.py", + signature="def create_user():", + start_line=15, + end_line=25, + ), + semantic=SemanticProperties(docstring="Create a new user"), + ), + ] + ) + + results = await store.search_nodes(["user"]) + assert len(results) == 2 + assert all("user" in r["name"].lower() for r in results) + + await store.close()