diff --git a/emdx/commands/cascade.py b/emdx/commands/cascade.py index 414c660..0965bb8 100644 --- a/emdx/commands/cascade.py +++ b/emdx/commands/cascade.py @@ -22,16 +22,8 @@ from rich.console import Console from rich.table import Table -from ..database.documents import ( - get_document, - get_oldest_at_stage, - get_cascade_stats, - list_documents_at_stage, - save_document, - save_document_to_cascade, - update_document_stage, - update_document_pr_url, -) +from ..database.documents import get_document, save_document +from ..database import cascade as cascade_db from ..services.claude_executor import execute_claude_detached, execute_claude_sync from ..database.connection import db_connection @@ -211,18 +203,18 @@ def _process_stage(doc: dict, stage: str, cascade_run_id: int = None) -> tuple[b project=doc.get("project"), parent_id=doc_id, ) - update_document_stage(new_doc_id, next_stage) + cascade_db.update_cascade_stage(new_doc_id, next_stage) console.print(f"[green]✓[/green] Created document #{new_doc_id}") if pr_url: - update_document_pr_url(new_doc_id, pr_url) - update_document_pr_url(doc_id, pr_url) + cascade_db.update_cascade_pr_url(new_doc_id, pr_url) + cascade_db.update_cascade_pr_url(doc_id, pr_url) # Mark original as done - update_document_stage(doc_id, "done") + cascade_db.update_cascade_stage(doc_id, "done") else: # No output - just advance - update_document_stage(doc_id, next_stage) + cascade_db.update_cascade_stage(doc_id, next_stage) console.print(f"[green]✓[/green] Advanced to {next_stage}") # Mark execution complete @@ -295,7 +287,7 @@ def add( raise typer.Exit(1) doc_title = title or f"Cascade: {content[:50]}..." - doc_id = save_document_to_cascade( + doc_id = cascade_db.save_document_to_cascade( title=doc_title, content=content, stage=stage, @@ -368,7 +360,7 @@ def _run_auto(doc_id: int, start_stage: str, stop_stage: str): @app.command() def status(): """Show cascade status - documents at each stage.""" - stats = get_cascade_stats() + stats = cascade_db.get_cascade_stats() table = Table(title="Cascade Status") table.add_column("Stage", style="cyan") @@ -411,7 +403,7 @@ def show( console.print(f"[red]Invalid stage: {stage}. Must be one of: {STAGES}[/red]") raise typer.Exit(1) - docs = list_documents_at_stage(stage, limit=limit) + docs = cascade_db.list_documents_at_stage(stage, limit=limit) if not docs: console.print(f"[dim]No documents at stage '{stage}'[/dim]") @@ -464,7 +456,7 @@ def process( console.print(f"[red]Document #{doc_id} is at stage '{doc.get('stage')}', not '{stage}'[/red]") raise typer.Exit(1) else: - doc = get_oldest_at_stage(stage) + doc = cascade_db.get_oldest_at_stage(stage) if not doc: console.print(f"[dim]No documents waiting at stage '{stage}'[/dim]") return @@ -529,7 +521,7 @@ def run( console.print(f"[dim]Checking every {interval}s. Press Ctrl+C to stop.[/dim]") while True: - doc = get_oldest_at_stage("idea") + doc = cascade_db.get_oldest_at_stage("idea") if doc: console.print(f"\n[cyan]Found idea: #{doc['id']}[/cyan]") try: @@ -558,7 +550,7 @@ def run( processed = False for stage in active_stages: - doc = get_oldest_at_stage(stage) + doc = cascade_db.get_oldest_at_stage(stage) if doc: console.print(f"\n[cyan]Found document at '{stage}'[/cyan]") success, _, _ = _process_stage(doc, stage) @@ -610,7 +602,7 @@ def advance( return new_stage = NEXT_STAGE.get(current_stage, "done") - update_document_stage(doc_id, new_stage) + cascade_db.update_cascade_stage(doc_id, new_stage) console.print(f"[green]✓[/green] Moved document #{doc_id}: {current_stage} → {new_stage}") @@ -632,7 +624,7 @@ def remove( console.print(f"[yellow]Document #{doc_id} is not in the cascade[/yellow]") return - update_document_stage(doc_id, None) + cascade_db.remove_from_cascade(doc_id) console.print(f"[green]✓[/green] Removed document #{doc_id} from cascade") @@ -658,7 +650,7 @@ def synthesize( # Keep source docs (don't advance them to done) emdx cascade synthesize analyzed --keep """ - docs = list_documents_at_stage(stage) + docs = cascade_db.list_documents_at_stage(stage) if not docs: console.print(f"[yellow]No documents at stage '{stage}' to synthesize[/yellow]") @@ -683,7 +675,7 @@ def synthesize( # Create the synthesized document doc_title = title or f"Synthesis: {len(docs)} {stage} documents" - new_doc_id = save_document_to_cascade( + new_doc_id = cascade_db.save_document_to_cascade( title=doc_title, content=combined_content, stage=next_stage, @@ -694,7 +686,7 @@ def synthesize( # Optionally advance source docs to done if not keep: for doc in docs: - update_document_stage(doc["id"], "done") + cascade_db.update_cascade_stage(doc["id"], "done") console.print(f"[dim]Moved {len(docs)} source documents to 'done'[/dim]") else: console.print(f"[dim]Kept {len(docs)} source documents at '{stage}'[/dim]") diff --git a/emdx/database/__init__.py b/emdx/database/__init__.py index 56095bb..d53546a 100644 --- a/emdx/database/__init__.py +++ b/emdx/database/__init__.py @@ -7,6 +7,7 @@ - groups: Document group operations - search: Full-text search operations - migrations: Database schema migrations +- cascade: Cascade-specific operations (stage management, PR URLs) This package maintains backward compatibility with the original sqlite_database.py API. """ @@ -26,6 +27,7 @@ ) from .search import search_documents from . import groups +from . import cascade class SQLiteDatabase: @@ -440,6 +442,7 @@ def search_documents(self, query, project=None, limit=10, fuzzy=False, "SQLiteDatabase", "db_connection", "groups", + "cascade", "save_document", "get_document", "list_documents", diff --git a/emdx/database/cascade.py b/emdx/database/cascade.py new file mode 100644 index 0000000..51beef2 --- /dev/null +++ b/emdx/database/cascade.py @@ -0,0 +1,354 @@ +"""Cascade-specific database operations for emdx. + +This module manages cascade metadata stored in document_cascade_metadata table. +It provides the primary interface for cascade stage management. + +The cascade system transforms documents through stages: +idea -> prompt -> analyzed -> planned -> done + +Each document in the cascade has metadata tracked here, separate from the +main documents table for efficiency (only ~1% of docs use cascade). +""" + +import logging +from typing import Any, Optional + +from ..utils.datetime_utils import parse_datetime +from .connection import db_connection + +logger = logging.getLogger(__name__) + +# Valid cascade stages +STAGES = ["idea", "prompt", "analyzed", "planned", "done"] + + +def _parse_cascade_datetimes(record: dict[str, Any]) -> dict[str, Any]: + """Parse datetime fields in a cascade metadata record.""" + for field in ["created_at", "updated_at"]: + if field in record and isinstance(record[field], str): + record[field] = parse_datetime(record[field]) + return record + + +def get_cascade_metadata(doc_id: int) -> dict[str, Any] | None: + """Get cascade metadata for a document. + + Args: + doc_id: Document ID + + Returns: + Dict with stage, pr_url, timestamps, or None if not in cascade + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + """ + SELECT id, document_id, stage, pr_url, created_at, updated_at + FROM document_cascade_metadata + WHERE document_id = ? + """, + (doc_id,), + ) + row = cursor.fetchone() + if row: + return _parse_cascade_datetimes(dict(row)) + return None + + +def update_cascade_stage(doc_id: int, stage: str | None) -> bool: + """Update a document's cascade stage (upsert). + + Args: + doc_id: Document ID + stage: New stage (or None to remove from cascade) + + Returns: + True if update was successful + """ + with db_connection.get_connection() as conn: + if stage is None: + # Remove from cascade + cursor = conn.execute( + "DELETE FROM document_cascade_metadata WHERE document_id = ?", + (doc_id,), + ) + else: + # Upsert: insert or update + cursor = conn.execute( + """ + INSERT INTO document_cascade_metadata (document_id, stage, updated_at) + VALUES (?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(document_id) DO UPDATE SET + stage = excluded.stage, + updated_at = CURRENT_TIMESTAMP + """, + (doc_id, stage), + ) + conn.commit() + return cursor.rowcount > 0 or stage is not None + + +def update_cascade_pr_url(doc_id: int, pr_url: str) -> bool: + """Update a document's PR URL (upsert). + + Args: + doc_id: Document ID + pr_url: The PR URL (e.g., https://github.com/user/repo/pull/123) + + Returns: + True if update was successful + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + """ + INSERT INTO document_cascade_metadata (document_id, pr_url, updated_at) + VALUES (?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(document_id) DO UPDATE SET + pr_url = excluded.pr_url, + updated_at = CURRENT_TIMESTAMP + """, + (doc_id, pr_url), + ) + conn.commit() + return cursor.rowcount > 0 + + +def get_cascade_pr_url(doc_id: int) -> str | None: + """Get a document's PR URL. + + Args: + doc_id: Document ID + + Returns: + PR URL or None if not set + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + "SELECT pr_url FROM document_cascade_metadata WHERE document_id = ?", + (doc_id,), + ) + row = cursor.fetchone() + return row[0] if row and row[0] else None + + +def get_oldest_at_stage(stage: str) -> dict[str, Any] | None: + """Get the oldest document at a given cascade stage. + + This is the core primitive for the patrol system - each patrol watches + a stage and picks up the oldest unprocessed document. + + Args: + stage: The stage to query (e.g., 'idea', 'prompt', 'analyzed', 'planned') + + Returns: + The oldest document at that stage (with cascade metadata), or None + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + """ + SELECT d.*, cm.stage as cascade_stage, cm.pr_url as cascade_pr_url + FROM documents d + JOIN document_cascade_metadata cm ON d.id = cm.document_id + WHERE cm.stage = ? AND d.is_deleted = FALSE + ORDER BY d.created_at ASC + LIMIT 1 + """, + (stage,), + ) + row = cursor.fetchone() + if row: + doc = dict(row) + # Map cascade metadata to expected fields + doc["stage"] = doc.pop("cascade_stage", None) + doc["pr_url"] = doc.pop("cascade_pr_url", None) + return doc + return None + + +def list_documents_at_stage(stage: str, limit: int = 50) -> list[dict[str, Any]]: + """List all documents at a given cascade stage. + + Args: + stage: The stage to query + limit: Maximum documents to return + + Returns: + List of documents at that stage, oldest first + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + """ + SELECT d.id, d.title, d.project, d.created_at, d.updated_at, + d.parent_id, cm.stage, cm.pr_url + FROM documents d + JOIN document_cascade_metadata cm ON d.id = cm.document_id + WHERE cm.stage = ? AND d.is_deleted = FALSE + ORDER BY d.created_at ASC + LIMIT ? + """, + (stage, limit), + ) + return [dict(row) for row in cursor.fetchall()] + + +def count_documents_at_stage(stage: str) -> int: + """Count documents at a given cascade stage. + + Args: + stage: The stage to query + + Returns: + Number of documents at that stage + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + """ + SELECT COUNT(*) FROM documents d + JOIN document_cascade_metadata cm ON d.id = cm.document_id + WHERE cm.stage = ? AND d.is_deleted = FALSE + """, + (stage,), + ) + return cursor.fetchone()[0] + + +def get_cascade_stats() -> dict[str, int]: + """Get counts of documents at each cascade stage. + + Returns: + Dictionary mapping stage name to document count + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + """ + SELECT cm.stage, COUNT(*) as count + FROM documents d + JOIN document_cascade_metadata cm ON d.id = cm.document_id + WHERE cm.stage IS NOT NULL AND d.is_deleted = FALSE + GROUP BY cm.stage + """ + ) + results = {stage: 0 for stage in STAGES} + for row in cursor.fetchall(): + results[row["stage"]] = row["count"] + return results + + +def remove_from_cascade(doc_id: int) -> bool: + """Remove cascade metadata for a document. + + This removes the document from cascade processing but does not + delete the document itself. + + Args: + doc_id: Document ID + + Returns: + True if metadata was removed + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + "DELETE FROM document_cascade_metadata WHERE document_id = ?", + (doc_id,), + ) + conn.commit() + return cursor.rowcount > 0 + + +def save_document_to_cascade( + title: str, + content: str, + stage: str = "idea", + project: str | None = None, + tags: list[str] | None = None, + parent_id: int | None = None, +) -> int: + """Save a document directly into the cascade at a given stage. + + Args: + title: Document title + content: Document content + stage: Initial cascade stage (default: 'idea') + project: Optional project name + tags: Optional list of tags + parent_id: Optional parent document ID + + Returns: + The new document's ID + """ + from .documents import save_document + + # Create the document first + doc_id = save_document( + title=title, + content=content, + project=project, + tags=tags, + parent_id=parent_id, + ) + + # Add to cascade + with db_connection.get_connection() as conn: + conn.execute( + """ + INSERT INTO document_cascade_metadata (document_id, stage) + VALUES (?, ?) + """, + (doc_id, stage), + ) + conn.commit() + + # Also update documents table for backward compatibility + with db_connection.get_connection() as conn: + conn.execute( + "UPDATE documents SET stage = ? WHERE id = ?", + (stage, doc_id), + ) + conn.commit() + + return doc_id + + +def list_cascade_runs(limit: int = 20) -> list[dict[str, Any]]: + """List recent cascade runs. + + Args: + limit: Maximum runs to return + + Returns: + List of cascade run records + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + """ + SELECT cr.*, d.title as start_doc_title + FROM cascade_runs cr + LEFT JOIN documents d ON cr.start_doc_id = d.id + ORDER BY cr.started_at DESC + LIMIT ? + """, + (limit,), + ) + return [dict(row) for row in cursor.fetchall()] + + +def get_cascade_run_executions(run_id: int) -> list[dict[str, Any]]: + """Get all executions for a cascade run. + + Args: + run_id: Cascade run ID + + Returns: + List of execution records linked to this run + """ + with db_connection.get_connection() as conn: + cursor = conn.execute( + """ + SELECT e.*, d.stage as doc_stage + FROM executions e + LEFT JOIN documents d ON e.doc_id = d.id + WHERE e.cascade_run_id = ? + ORDER BY e.started_at ASC + """, + (run_id,), + ) + return [dict(row) for row in cursor.fetchall()] diff --git a/emdx/database/documents.py b/emdx/database/documents.py index ca58fb6..c0ad180 100644 --- a/emdx/database/documents.py +++ b/emdx/database/documents.py @@ -887,11 +887,75 @@ def list_non_workflow_documents( # ============================================================================= # Cascade Stage Operations (for autonomous document transformation) # ============================================================================= +# +# DEPRECATION NOTE: These functions now dual-write to both the documents table +# (for backward compatibility) and the new document_cascade_metadata table. +# New code should import from emdx.database.cascade instead. +# +# During the transition period: +# - READS: Primary source is document_cascade_metadata (via cascade.py) +# - WRITES: Dual-write to both tables for backward compatibility +# ============================================================================= + + +def _sync_cascade_metadata(doc_id: int, stage: str | None = None, pr_url: str | None = None) -> None: + """Sync cascade metadata to the new table (dual-write for backward compat). + + This is called by the legacy functions to ensure data consistency + between documents table and document_cascade_metadata table. + """ + try: + with db_connection.get_connection() as conn: + # Check if record exists + cursor = conn.execute( + "SELECT id FROM document_cascade_metadata WHERE document_id = ?", + (doc_id,), + ) + exists = cursor.fetchone() is not None + + if stage is None and pr_url is None: + # Remove from cascade + if exists: + conn.execute( + "DELETE FROM document_cascade_metadata WHERE document_id = ?", + (doc_id,), + ) + elif exists: + # Update existing record + updates = [] + params = [] + if stage is not None: + updates.append("stage = ?") + params.append(stage) + if pr_url is not None: + updates.append("pr_url = ?") + params.append(pr_url) + updates.append("updated_at = CURRENT_TIMESTAMP") + params.append(doc_id) + conn.execute( + f"UPDATE document_cascade_metadata SET {', '.join(updates)} WHERE document_id = ?", + params, + ) + else: + # Insert new record + conn.execute( + """ + INSERT INTO document_cascade_metadata (document_id, stage, pr_url) + VALUES (?, ?, ?) + """, + (doc_id, stage, pr_url), + ) + conn.commit() + except Exception as e: + # Log but don't fail - the documents table write succeeded + logger.warning(f"Failed to sync cascade metadata for doc {doc_id}: {e}") def get_oldest_at_stage(stage: str) -> dict[str, Any] | None: """Get the oldest document at a given cascade stage. + DEPRECATED: Use emdx.database.cascade.get_oldest_at_stage() instead. + This is the core primitive for the patrol system - each patrol watches a stage and picks up the oldest unprocessed document. @@ -901,6 +965,14 @@ def get_oldest_at_stage(stage: str) -> dict[str, Any] | None: Returns: The oldest document at that stage, or None if no documents are waiting """ + # Read from new table with fallback to old + try: + from . import cascade as cascade_db + return cascade_db.get_oldest_at_stage(stage) + except Exception: + pass + + # Fallback to old table with db_connection.get_connection() as conn: cursor = conn.execute( """ @@ -918,6 +990,8 @@ def get_oldest_at_stage(stage: str) -> dict[str, Any] | None: def update_document_stage(doc_id: int, stage: str | None) -> bool: """Update a document's cascade stage. + DEPRECATED: Use emdx.database.cascade.update_cascade_stage() instead. + Args: doc_id: Document ID stage: New stage (or None to remove from cascade) @@ -925,6 +999,7 @@ def update_document_stage(doc_id: int, stage: str | None) -> bool: Returns: True if update was successful """ + # Write to old table with db_connection.get_connection() as conn: cursor = conn.execute( """ @@ -935,12 +1010,20 @@ def update_document_stage(doc_id: int, stage: str | None) -> bool: (stage, doc_id), ) conn.commit() - return cursor.rowcount > 0 + success = cursor.rowcount > 0 + + # Dual-write to new table + if success: + _sync_cascade_metadata(doc_id, stage=stage) + + return success def update_document_pr_url(doc_id: int, pr_url: str) -> bool: """Update a document's PR URL (for cascade done stage). + DEPRECATED: Use emdx.database.cascade.update_cascade_pr_url() instead. + Args: doc_id: Document ID pr_url: The PR URL (e.g., https://github.com/user/repo/pull/123) @@ -948,6 +1031,7 @@ def update_document_pr_url(doc_id: int, pr_url: str) -> bool: Returns: True if update was successful """ + # Write to old table with db_connection.get_connection() as conn: cursor = conn.execute( """ @@ -958,18 +1042,36 @@ def update_document_pr_url(doc_id: int, pr_url: str) -> bool: (pr_url, doc_id), ) conn.commit() - return cursor.rowcount > 0 + success = cursor.rowcount > 0 + + # Dual-write to new table + if success: + _sync_cascade_metadata(doc_id, pr_url=pr_url) + + return success def get_document_pr_url(doc_id: int) -> str | None: """Get a document's PR URL. + DEPRECATED: Use emdx.database.cascade.get_cascade_pr_url() instead. + Args: doc_id: Document ID Returns: PR URL or None if not set """ + # Read from new table with fallback + try: + from . import cascade as cascade_db + result = cascade_db.get_cascade_pr_url(doc_id) + if result is not None: + return result + except Exception: + pass + + # Fallback to old table with db_connection.get_connection() as conn: cursor = conn.execute( "SELECT pr_url FROM documents WHERE id = ? AND is_deleted = FALSE", @@ -982,6 +1084,8 @@ def get_document_pr_url(doc_id: int) -> str | None: def list_documents_at_stage(stage: str, limit: int = 50) -> list[dict[str, Any]]: """List all documents at a given cascade stage. + DEPRECATED: Use emdx.database.cascade.list_documents_at_stage() instead. + Args: stage: The stage to query limit: Maximum documents to return @@ -989,6 +1093,14 @@ def list_documents_at_stage(stage: str, limit: int = 50) -> list[dict[str, Any]] Returns: List of documents at that stage, oldest first """ + # Read from new table with fallback + try: + from . import cascade as cascade_db + return cascade_db.list_documents_at_stage(stage, limit) + except Exception: + pass + + # Fallback to old table with db_connection.get_connection() as conn: cursor = conn.execute( """ @@ -1006,12 +1118,22 @@ def list_documents_at_stage(stage: str, limit: int = 50) -> list[dict[str, Any]] def count_documents_at_stage(stage: str) -> int: """Count documents at a given cascade stage. + DEPRECATED: Use emdx.database.cascade.count_documents_at_stage() instead. + Args: stage: The stage to query Returns: Number of documents at that stage """ + # Read from new table with fallback + try: + from . import cascade as cascade_db + return cascade_db.count_documents_at_stage(stage) + except Exception: + pass + + # Fallback to old table with db_connection.get_connection() as conn: cursor = conn.execute( """ @@ -1026,9 +1148,19 @@ def count_documents_at_stage(stage: str) -> int: def get_cascade_stats() -> dict[str, int]: """Get counts of documents at each cascade stage. + DEPRECATED: Use emdx.database.cascade.get_cascade_stats() instead. + Returns: Dictionary mapping stage name to document count """ + # Read from new table with fallback + try: + from . import cascade as cascade_db + return cascade_db.get_cascade_stats() + except Exception: + pass + + # Fallback to old table stages = ["idea", "prompt", "analyzed", "planned", "done"] with db_connection.get_connection() as conn: cursor = conn.execute( @@ -1055,6 +1187,8 @@ def save_document_to_cascade( ) -> int: """Save a document directly into the cascade at a given stage. + DEPRECATED: Use emdx.database.cascade.save_document_to_cascade() instead. + Args: title: Document title content: Document content @@ -1066,6 +1200,7 @@ def save_document_to_cascade( Returns: The new document's ID """ + # Write to old table with db_connection.get_connection() as conn: cursor = conn.execute( """ @@ -1081,4 +1216,7 @@ def save_document_to_cascade( from emdx.models.tags import add_tags_to_document add_tags_to_document(doc_id, tags) - return doc_id + # Dual-write to new table + _sync_cascade_metadata(doc_id, stage=stage) + + return doc_id diff --git a/emdx/database/migrations.py b/emdx/database/migrations.py index ec48297..37f3d73 100644 --- a/emdx/database/migrations.py +++ b/emdx/database/migrations.py @@ -1718,6 +1718,63 @@ def migration_031_add_cascade_runs(conn: sqlite3.Connection): conn.commit() +def migration_032_extract_cascade_metadata(conn: sqlite3.Connection): + """Extract cascade metadata (stage, pr_url) to a dedicated table. + + This migration: + 1. Creates document_cascade_metadata table with stage and pr_url columns + 2. Creates partial indexes for efficient stage/pr_url queries + 3. Backfills data from documents table where cascade data exists + + The documents.stage and documents.pr_url columns are kept for backward + compatibility during the transition period. The new cascade.py module + reads from the new table while documents.py dual-writes to both. + """ + cursor = conn.cursor() + + # Create the cascade metadata table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS document_cascade_metadata ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + document_id INTEGER NOT NULL UNIQUE, + stage TEXT, + pr_url TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (document_id) REFERENCES documents(id) ON DELETE CASCADE + ) + """) + + # Create partial indexes for efficient queries + # Index on stage for documents currently in cascade (stage IS NOT NULL) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_cascade_meta_stage + ON document_cascade_metadata(stage) WHERE stage IS NOT NULL + """) + + # Index on pr_url for documents with PRs + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_cascade_meta_pr_url + ON document_cascade_metadata(pr_url) WHERE pr_url IS NOT NULL + """) + + # Index for efficient lookups by document_id (already UNIQUE but explicit index helps) + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_cascade_meta_document_id + ON document_cascade_metadata(document_id) + """) + + # Backfill existing cascade data from documents table + cursor.execute(""" + INSERT OR IGNORE INTO document_cascade_metadata (document_id, stage, pr_url) + SELECT id, stage, pr_url + FROM documents + WHERE stage IS NOT NULL OR pr_url IS NOT NULL + """) + + conn.commit() + + # List of all migrations in order MIGRATIONS: list[tuple[int, str, Callable]] = [ (0, "Create documents table", migration_000_create_documents_table), @@ -1752,6 +1809,7 @@ def migration_031_add_cascade_runs(conn: sqlite3.Connection): (29, "Add document PR URL for cascade", migration_029_add_document_pr_url), (30, "Remove unused tables and dead code", migration_030_cleanup_unused_tables), (31, "Add cascade runs tracking", migration_031_add_cascade_runs), + (32, "Extract cascade metadata to dedicated table", migration_032_extract_cascade_metadata), ] diff --git a/emdx/main.py b/emdx/main.py index d34b392..e198075 100644 --- a/emdx/main.py +++ b/emdx/main.py @@ -35,8 +35,6 @@ "similar": "emdx.commands.similarity:app", # External services (imports google API libs) "gdoc": "emdx.commands.gdoc:app", - # TUI (imports textual, can be slow) - "gui": "emdx.ui.gui:gui", } # Pre-computed help strings so --help doesn't trigger imports @@ -50,7 +48,6 @@ "ai": "AI-powered Q&A and semantic search", "similar": "Find similar documents using TF-IDF", "gdoc": "Google Docs integration", - "gui": "Launch interactive TUI browser", } @@ -116,6 +113,7 @@ def disabled_command(): from emdx.commands.keybindings import app as keybindings_app from emdx.commands.prime import prime as prime_command from emdx.commands.status import status as status_command +from emdx.ui.gui import gui as gui_command from emdx.commands.analyze import app as analyze_app from emdx.commands.maintain import app as maintain_app from emdx.commands.gist import app as gist_app @@ -191,6 +189,9 @@ def disabled_command(): # Add the status command for consolidated project overview app.command(name="status")(status_command) +# Add the gui command for interactive TUI browser +app.command(name="gui")(gui_command) + # ============================================================================= # Handle safe mode for unsafe commands diff --git a/emdx/ui/cascade_browser.py b/emdx/ui/cascade_browser.py index 8ecf591..566e989 100644 --- a/emdx/ui/cascade_browser.py +++ b/emdx/ui/cascade_browser.py @@ -13,12 +13,8 @@ from textual.widget import Widget from textual.widgets import Button, DataTable, Label, MarkdownViewer, Static, TextArea -from emdx.database.documents import ( - get_document, - get_cascade_stats, - list_documents_at_stage, - update_document_stage, -) +from emdx.database.documents import get_document +from emdx.database import cascade as cascade_db from emdx.database.connection import db_connection logger = logging.getLogger(__name__) @@ -309,7 +305,7 @@ def compose(self) -> ComposeResult: def refresh_stats(self) -> None: """Refresh stage statistics.""" - self.stats = get_cascade_stats() + self.stats = cascade_db.get_cascade_stats() self._update_display() def _update_display(self) -> None: @@ -398,7 +394,7 @@ def compose(self) -> ComposeResult: def load_stage(self, stage: str) -> None: """Load documents for a stage.""" self.current_stage = stage - self.docs = list_documents_at_stage(stage, limit=50) + self.docs = cascade_db.list_documents_at_stage(stage, limit=50) self.selected_ids.clear() # Clear selection when changing stages self._refresh_table() self._update_selection_status() @@ -1485,18 +1481,16 @@ def action_advance_doc(self) -> None: next_stage = NEXT_STAGE.get(stage) if next_stage: - update_document_stage(doc_id, next_stage) + cascade_db.update_cascade_stage(doc_id, next_stage) self._update_status(f"[green]Moved #{doc_id}: {stage} → {next_stage}[/green]") self.refresh_all() def action_new_idea(self) -> None: """Open modal to create a new cascade idea.""" - from emdx.database.documents import save_document_to_cascade - def handle_idea_result(idea_text: str | None) -> None: if idea_text: # Save the idea to cascade at 'idea' stage - doc_id = save_document_to_cascade( + doc_id = cascade_db.save_document_to_cascade( title=f"Cascade: {idea_text[:50]}{'...' if len(idea_text) > 50 else ''}", content=idea_text, stage="idea", @@ -1550,7 +1544,7 @@ def action_synthesize(self) -> None: if not selected_ids: # No selection - use all docs at stage - docs = list_documents_at_stage(stage) + docs = cascade_db.list_documents_at_stage(stage) doc_ids = [d["id"] for d in docs] else: doc_ids = selected_ids @@ -1560,8 +1554,6 @@ def action_synthesize(self) -> None: return # Build combined content for Claude to synthesize - from ..database.documents import get_document, save_document_to_cascade - combined_parts = [] for doc_id in doc_ids: doc = get_document(str(doc_id)) @@ -1572,7 +1564,7 @@ def action_synthesize(self) -> None: # Create a synthesis input document (keeps sources intact for now) title = f"Synthesis: {len(doc_ids)} {stage} documents" - synthesis_doc_id = save_document_to_cascade( + synthesis_doc_id = cascade_db.save_document_to_cascade( title=title, content=combined_content, stage=stage, # Same stage - will be processed by Claude @@ -1665,7 +1657,7 @@ def on_cascade_view_process_stage(self, event: CascadeView.ProcessStage) -> None return else: # Get oldest at stage - docs = list_documents_at_stage(stage) + docs = cascade_db.list_documents_at_stage(stage) if not docs: self._update_status(f"[yellow]No documents at stage '{stage}'[/yellow]") return @@ -1799,7 +1791,7 @@ def update_failed(): # Create child document with output if output: - from emdx.database.documents import save_document, update_document_stage + from emdx.database.documents import save_document next_stage = NEXT_STAGE.get(stage, "done") child_title = f"{doc.get('title', '')} [{stage}→{next_stage}]" new_doc_id = save_document( @@ -1808,8 +1800,8 @@ def update_failed(): project=doc.get("project"), parent_id=doc_id, ) - update_document_stage(new_doc_id, next_stage) - update_document_stage(doc_id, "done") + cascade_db.update_cascade_stage(new_doc_id, next_stage) + cascade_db.update_cascade_stage(doc_id, "done") def update_success(): self._update_status(f"[green]✓ Done![/green] Created #{new_doc_id} at {next_stage}") diff --git a/emdx/ui/gui.py b/emdx/ui/gui.py index 1a49201..0c76157 100644 --- a/emdx/ui/gui.py +++ b/emdx/ui/gui.py @@ -8,10 +8,7 @@ from emdx.utils.output import console -app = typer.Typer() - -@app.command() def gui( theme: Optional[str] = typer.Option( None, diff --git a/pyproject.toml b/pyproject.toml index 7a5816f..b7b7e12 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "emdx" -version = "0.9.1" +version = "0.9.5" description = "Documentation Index Management System - A powerful knowledge base for developers" authors = ["Alex Rockwell "] readme = "README.md" diff --git a/tests/test_cascade_metadata.py b/tests/test_cascade_metadata.py new file mode 100644 index 0000000..72a0811 --- /dev/null +++ b/tests/test_cascade_metadata.py @@ -0,0 +1,453 @@ +"""Tests for cascade metadata extraction and the new cascade database module.""" + +import os +import sqlite3 +import tempfile +from pathlib import Path + +import pytest + + +@pytest.fixture(scope="function") +def test_db_path(): + """Create a temporary database path for testing. + + Using function scope to ensure each test gets a fresh database. + """ + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f: + db_path = Path(f.name) + yield db_path + # Cleanup + if db_path.exists(): + db_path.unlink() + + +@pytest.fixture(scope="function") +def setup_test_db(test_db_path, monkeypatch): + """Set up a test database with migrations run. + + Using function scope to ensure each test gets a fresh database. + """ + # Set the test database environment variable + monkeypatch.setenv("EMDX_TEST_DB", str(test_db_path)) + + # Import and run migrations + from emdx.database.migrations import run_migrations + from emdx.database.connection import DatabaseConnection + + run_migrations(test_db_path) + + # Create a new connection instance for the test db + conn_instance = DatabaseConnection(test_db_path) + + # Patch the global db_connection in all relevant modules + import emdx.database.connection as conn_module + import emdx.database.documents as docs_module + import emdx.database.cascade as cascade_module + + original_conn = conn_module.db_connection + + # Patch in all modules that use db_connection + conn_module.db_connection = conn_instance + docs_module.db_connection = conn_instance + cascade_module.db_connection = conn_instance + + yield conn_instance + + # Restore original + conn_module.db_connection = original_conn + docs_module.db_connection = original_conn + cascade_module.db_connection = original_conn + + +class TestMigrationCreatesTable: + """Test that the migration creates the cascade metadata table correctly.""" + + def test_table_exists(self, setup_test_db): + """Test that document_cascade_metadata table is created.""" + with setup_test_db.get_connection() as conn: + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='document_cascade_metadata'" + ) + result = cursor.fetchone() + assert result is not None + assert result[0] == "document_cascade_metadata" + + def test_table_schema(self, setup_test_db): + """Test the table has correct columns.""" + with setup_test_db.get_connection() as conn: + cursor = conn.execute("PRAGMA table_info(document_cascade_metadata)") + columns = {row[1]: row[2] for row in cursor.fetchall()} + + assert "id" in columns + assert "document_id" in columns + assert "stage" in columns + assert "pr_url" in columns + assert "created_at" in columns + assert "updated_at" in columns + + def test_indexes_exist(self, setup_test_db): + """Test that partial indexes are created.""" + with setup_test_db.get_connection() as conn: + cursor = conn.execute( + "SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='document_cascade_metadata'" + ) + indexes = {row[0] for row in cursor.fetchall()} + + assert "idx_cascade_meta_stage" in indexes + assert "idx_cascade_meta_pr_url" in indexes + assert "idx_cascade_meta_document_id" in indexes + + +class TestCascadeMetadataBackfill: + """Test that existing cascade data is backfilled correctly.""" + + def test_backfill_stage_data(self, setup_test_db): + """Test that documents with stage are backfilled.""" + # First create a document with stage directly in documents table + with setup_test_db.get_connection() as conn: + cursor = conn.execute( + "INSERT INTO documents (title, content, stage) VALUES (?, ?, ?)", + ("Test Doc", "Content", "idea"), + ) + doc_id = cursor.lastrowid + conn.commit() + + # Manually insert into cascade metadata (simulating what migration does) + conn.execute( + "INSERT OR IGNORE INTO document_cascade_metadata (document_id, stage) VALUES (?, ?)", + (doc_id, "idea"), + ) + conn.commit() + + # Verify it's in the new table + cursor = conn.execute( + "SELECT stage FROM document_cascade_metadata WHERE document_id = ?", + (doc_id,), + ) + result = cursor.fetchone() + assert result is not None + assert result[0] == "idea" + + def test_backfill_pr_url_data(self, setup_test_db): + """Test that documents with pr_url are backfilled.""" + with setup_test_db.get_connection() as conn: + cursor = conn.execute( + "INSERT INTO documents (title, content, pr_url) VALUES (?, ?, ?)", + ("Test Doc", "Content", "https://github.com/test/repo/pull/1"), + ) + doc_id = cursor.lastrowid + conn.commit() + + # Manually insert + conn.execute( + "INSERT OR IGNORE INTO document_cascade_metadata (document_id, pr_url) VALUES (?, ?)", + (doc_id, "https://github.com/test/repo/pull/1"), + ) + conn.commit() + + cursor = conn.execute( + "SELECT pr_url FROM document_cascade_metadata WHERE document_id = ?", + (doc_id,), + ) + result = cursor.fetchone() + assert result is not None + assert result[0] == "https://github.com/test/repo/pull/1" + + +class TestCascadeModuleCRUD: + """Test CRUD operations in the cascade module.""" + + def test_get_cascade_metadata_empty(self, setup_test_db): + """Test getting metadata for document not in cascade.""" + from emdx.database import cascade as cascade_db + + result = cascade_db.get_cascade_metadata(99999) + assert result is None + + def test_update_cascade_stage_insert(self, setup_test_db): + """Test adding a document to cascade via stage update.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document + + doc_id = save_document("Test", "Content") + + result = cascade_db.update_cascade_stage(doc_id, "idea") + assert result is True + + metadata = cascade_db.get_cascade_metadata(doc_id) + assert metadata is not None + assert metadata["stage"] == "idea" + + def test_update_cascade_stage_update(self, setup_test_db): + """Test updating an existing cascade stage.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document + + doc_id = save_document("Test", "Content") + cascade_db.update_cascade_stage(doc_id, "idea") + + result = cascade_db.update_cascade_stage(doc_id, "prompt") + assert result is True + + metadata = cascade_db.get_cascade_metadata(doc_id) + assert metadata["stage"] == "prompt" + + def test_update_cascade_stage_remove(self, setup_test_db): + """Test removing a document from cascade.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document + + doc_id = save_document("Test", "Content") + cascade_db.update_cascade_stage(doc_id, "idea") + + result = cascade_db.update_cascade_stage(doc_id, None) + assert result is True + + metadata = cascade_db.get_cascade_metadata(doc_id) + assert metadata is None + + def test_update_cascade_pr_url(self, setup_test_db): + """Test updating PR URL.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document + + doc_id = save_document("Test", "Content") + cascade_db.update_cascade_stage(doc_id, "done") + + pr_url = "https://github.com/test/repo/pull/123" + result = cascade_db.update_cascade_pr_url(doc_id, pr_url) + assert result is True + + retrieved = cascade_db.get_cascade_pr_url(doc_id) + assert retrieved == pr_url + + def test_remove_from_cascade(self, setup_test_db): + """Test removing cascade metadata.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document + + doc_id = save_document("Test", "Content") + cascade_db.update_cascade_stage(doc_id, "idea") + + result = cascade_db.remove_from_cascade(doc_id) + assert result is True + + metadata = cascade_db.get_cascade_metadata(doc_id) + assert metadata is None + + +class TestCascadeStageQueries: + """Test stage-based query operations.""" + + def test_get_oldest_at_stage(self, setup_test_db): + """Test getting oldest document at stage.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document + import time + + # Create documents with slight time difference + doc1 = save_document("First", "Content 1") + cascade_db.update_cascade_stage(doc1, "idea") + + time.sleep(0.01) # Small delay to ensure different timestamps + + doc2 = save_document("Second", "Content 2") + cascade_db.update_cascade_stage(doc2, "idea") + + oldest = cascade_db.get_oldest_at_stage("idea") + assert oldest is not None + assert oldest["id"] == doc1 + + def test_get_oldest_at_stage_empty(self, setup_test_db): + """Test getting oldest when stage is empty.""" + from emdx.database import cascade as cascade_db + + oldest = cascade_db.get_oldest_at_stage("idea") + assert oldest is None + + def test_list_documents_at_stage(self, setup_test_db): + """Test listing documents at a stage.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document + + doc1 = save_document("Doc 1", "Content 1") + cascade_db.update_cascade_stage(doc1, "idea") + + doc2 = save_document("Doc 2", "Content 2") + cascade_db.update_cascade_stage(doc2, "idea") + + doc3 = save_document("Doc 3", "Content 3") + cascade_db.update_cascade_stage(doc3, "prompt") # Different stage + + docs = cascade_db.list_documents_at_stage("idea") + assert len(docs) == 2 + assert all(d["stage"] == "idea" for d in docs) + + def test_count_documents_at_stage(self, setup_test_db): + """Test counting documents at a stage.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document + + doc1 = save_document("Doc 1", "Content") + cascade_db.update_cascade_stage(doc1, "analyzed") + + doc2 = save_document("Doc 2", "Content") + cascade_db.update_cascade_stage(doc2, "analyzed") + + count = cascade_db.count_documents_at_stage("analyzed") + assert count == 2 + + count_empty = cascade_db.count_documents_at_stage("planned") + assert count_empty == 0 + + def test_get_cascade_stats(self, setup_test_db): + """Test getting cascade statistics.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document + + # Create documents at various stages + for stage, count in [("idea", 3), ("prompt", 2), ("analyzed", 1)]: + for i in range(count): + doc_id = save_document(f"{stage} doc {i}", "Content") + cascade_db.update_cascade_stage(doc_id, stage) + + stats = cascade_db.get_cascade_stats() + + assert stats["idea"] == 3 + assert stats["prompt"] == 2 + assert stats["analyzed"] == 1 + assert stats["planned"] == 0 + assert stats["done"] == 0 + + +class TestForeignKeyCascadeDelete: + """Test that foreign key ON DELETE CASCADE works.""" + + def test_cascade_delete_removes_metadata(self, setup_test_db): + """Test that deleting a document removes its cascade metadata.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document, delete_document + + doc_id = save_document("Test", "Content") + cascade_db.update_cascade_stage(doc_id, "idea") + + # Verify metadata exists + metadata = cascade_db.get_cascade_metadata(doc_id) + assert metadata is not None + + # Hard delete the document + with setup_test_db.get_connection() as conn: + conn.execute("DELETE FROM documents WHERE id = ?", (doc_id,)) + conn.commit() + + # Metadata should be gone due to ON DELETE CASCADE + metadata = cascade_db.get_cascade_metadata(doc_id) + assert metadata is None + + +class TestSaveDocumentToCascade: + """Test the save_document_to_cascade function.""" + + def test_save_to_cascade_creates_both_records(self, setup_test_db): + """Test that save_document_to_cascade creates document and metadata.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import get_document + + doc_id = cascade_db.save_document_to_cascade( + title="Cascade Test", + content="Test content", + stage="idea", + project="test-project", + ) + + # Check document exists + doc = get_document(doc_id) + assert doc is not None + assert doc["title"] == "Cascade Test" + assert doc["project"] == "test-project" + + # Check cascade metadata exists + metadata = cascade_db.get_cascade_metadata(doc_id) + assert metadata is not None + assert metadata["stage"] == "idea" + + def test_save_to_cascade_with_parent(self, setup_test_db): + """Test saving cascade document with parent.""" + from emdx.database import cascade as cascade_db + from emdx.database.documents import save_document, get_document + + parent_id = save_document("Parent", "Parent content") + + child_id = cascade_db.save_document_to_cascade( + title="Child", + content="Child content", + stage="prompt", + parent_id=parent_id, + ) + + child = get_document(child_id) + assert child["parent_id"] == parent_id + + +class TestBackwardCompatibility: + """Test that documents.py functions still work and dual-write.""" + + def test_documents_update_stage_dual_writes(self, setup_test_db): + """Test that update_document_stage writes to both tables.""" + from emdx.database.documents import save_document, update_document_stage + from emdx.database import cascade as cascade_db + + doc_id = save_document("Test", "Content") + + # Use the old function + update_document_stage(doc_id, "idea") + + # Check old table + with setup_test_db.get_connection() as conn: + cursor = conn.execute( + "SELECT stage FROM documents WHERE id = ?", (doc_id,) + ) + result = cursor.fetchone() + assert result[0] == "idea" + + # Check new table + metadata = cascade_db.get_cascade_metadata(doc_id) + assert metadata is not None + assert metadata["stage"] == "idea" + + def test_documents_update_pr_url_dual_writes(self, setup_test_db): + """Test that update_document_pr_url writes to both tables.""" + from emdx.database.documents import save_document, update_document_stage, update_document_pr_url + from emdx.database import cascade as cascade_db + + doc_id = save_document("Test", "Content") + update_document_stage(doc_id, "done") + + pr_url = "https://github.com/test/repo/pull/456" + update_document_pr_url(doc_id, pr_url) + + # Check old table + with setup_test_db.get_connection() as conn: + cursor = conn.execute( + "SELECT pr_url FROM documents WHERE id = ?", (doc_id,) + ) + result = cursor.fetchone() + assert result[0] == pr_url + + # Check new table + retrieved = cascade_db.get_cascade_pr_url(doc_id) + assert retrieved == pr_url + + def test_documents_get_cascade_stats_reads_new_table(self, setup_test_db): + """Test that get_cascade_stats reads from new table.""" + from emdx.database.documents import get_cascade_stats, save_document + from emdx.database import cascade as cascade_db + + # Add documents via cascade module + doc_id = save_document("Test", "Content") + cascade_db.update_cascade_stage(doc_id, "idea") + + # Use old function - should read from new table + stats = get_cascade_stats() + assert stats["idea"] >= 1