From 82ec521f165ec0c230b38fe535021950511996e0 Mon Sep 17 00:00:00 2001 From: Rob Elkin Date: Tue, 10 Feb 2026 00:46:46 +0000 Subject: [PATCH 1/5] Add --clean flag to sync-full for fresh re-sync from Gmail Adds a --clean option that deletes all local data for an account (messages, conversations, labels, sync history) and re-syncs from scratch. Useful for recovering from corrupted state or resetting staged/deleted items. Features: - ResetSourceData() in store with transaction and CASCADE deletes - Confirmation prompt showing what will be deleted (use --yes to skip) - Double Ctrl+C pattern: first saves checkpoint, second force quits - Account isolation tests to verify other accounts are untouched Co-Authored-By: Claude Opus 4.5 --- cmd/msgvault/cmd/syncfull.go | 78 +++++++++++++- internal/store/sync.go | 56 ++++++++++ internal/store/sync_test.go | 200 +++++++++++++++++++++++++++++++++++ 3 files changed, 331 insertions(+), 3 deletions(-) diff --git a/cmd/msgvault/cmd/syncfull.go b/cmd/msgvault/cmd/syncfull.go index 0faca7b8..f8409047 100644 --- a/cmd/msgvault/cmd/syncfull.go +++ b/cmd/msgvault/cmd/syncfull.go @@ -21,6 +21,8 @@ var ( syncBefore string syncAfter string syncLimit int + syncClean bool + syncCleanYes bool ) var syncFullCmd = &cobra.Command{ @@ -37,18 +39,30 @@ Date filters: --after 2024-01-01 Only messages on or after this date --before 2024-12-31 Only messages before this date +Clean sync: + --clean Delete all local data for the account and re-sync + from scratch. Use this to reset staging/deletion + state or recover from a corrupted local database. + Requires confirmation (use --yes to skip). + Examples: msgvault sync-full # Sync all accounts msgvault sync-full you@gmail.com msgvault sync-full you@gmail.com --after 2024-01-01 msgvault sync-full you@gmail.com --query "from:someone@example.com" - msgvault sync-full you@gmail.com --noresume # Force fresh sync`, + msgvault sync-full you@gmail.com --noresume # Force fresh sync + msgvault sync-full you@gmail.com --clean # Delete local data and re-sync + msgvault sync-full you@gmail.com --clean --yes # Skip confirmation`, Args: cobra.MaximumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { if syncLimit < 0 { return fmt.Errorf("--limit must be a non-negative number") } + if syncClean && len(args) == 0 { + return fmt.Errorf("--clean requires specifying an account email") + } + // Validate config if cfg.OAuth.ClientSecrets == "" { return errOAuthNotConfigured() @@ -100,15 +114,71 @@ Examples: ctx, cancel := context.WithCancel(cmd.Context()) defer cancel() - // Handle Ctrl+C gracefully + // Handle Ctrl+C gracefully (first = graceful, second = force exit) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan - fmt.Println("\nInterrupted. Saving checkpoint...") + fmt.Println("\nInterrupted. Saving checkpoint... (press Ctrl+C again to force quit)") cancel() + // Wait for second signal to force exit + <-sigChan + fmt.Println("\nForce quit.") + os.Exit(1) }() + // Handle --clean: delete all local data for the account before syncing + if syncClean { + email := emails[0] // Already validated that exactly one email is specified + source, err := s.GetSourceByIdentifier(email) + if err != nil { + return fmt.Errorf("lookup account: %w", err) + } + if source == nil { + return fmt.Errorf("account %s not found in database", email) + } + + // Count what will be deleted + fmt.Printf("Preparing to clean local data for %s...\n", email) + var msgCount, convCount, labelCount int64 + _ = s.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", source.ID).Scan(&msgCount) + _ = s.DB().QueryRow("SELECT COUNT(*) FROM conversations WHERE source_id = ?", source.ID).Scan(&convCount) + _ = s.DB().QueryRow("SELECT COUNT(*) FROM labels WHERE source_id = ?", source.ID).Scan(&labelCount) + + fmt.Println() + fmt.Println("This will permanently delete from the LOCAL database:") + fmt.Printf(" • %d messages\n", msgCount) + fmt.Printf(" • %d conversations\n", convCount) + fmt.Printf(" • %d labels\n", labelCount) + fmt.Printf(" • All sync history and checkpoints\n") + fmt.Println() + fmt.Println("Note: This does NOT delete anything from Gmail.") + fmt.Println(" After cleaning, a full re-sync will download all messages again.") + fmt.Println() + + // Require confirmation unless --yes is provided + if !syncCleanYes { + fmt.Print("Proceed with clean? [y/N]: ") + var response string + _, _ = fmt.Scanln(&response) + if response != "y" && response != "Y" { + fmt.Println("Cancelled.") + return nil + } + fmt.Println() + } + + // Perform the clean with progress + fmt.Print("Deleting messages...") + deleted, err := s.ResetSourceData(source.ID) + if err != nil { + fmt.Println(" failed") + return fmt.Errorf("reset account data: %w", err) + } + fmt.Println(" done") + fmt.Printf("Deleted %d messages from local database.\n\n", deleted) + } + var syncErrors []string for _, email := range emails { if ctx.Err() != nil { @@ -332,5 +402,7 @@ func init() { syncFullCmd.Flags().StringVar(&syncBefore, "before", "", "Only messages before this date (YYYY-MM-DD)") syncFullCmd.Flags().StringVar(&syncAfter, "after", "", "Only messages after this date (YYYY-MM-DD)") syncFullCmd.Flags().IntVar(&syncLimit, "limit", 0, "Limit number of messages (for testing)") + syncFullCmd.Flags().BoolVar(&syncClean, "clean", false, "Delete all local data for the account and re-sync from scratch") + syncFullCmd.Flags().BoolVarP(&syncCleanYes, "yes", "y", false, "Skip confirmation prompt for --clean") rootCmd.AddCommand(syncFullCmd) } diff --git a/internal/store/sync.go b/internal/store/sync.go index 3a6701d9..3455dd49 100644 --- a/internal/store/sync.go +++ b/internal/store/sync.go @@ -384,3 +384,59 @@ func (s *Store) GetSourceByIdentifier(identifier string) (*Source, error) { return source, nil } + +// ResetSourceData deletes all synced data for a source while keeping the source +// entry itself. This allows a clean re-sync from Gmail without losing the account +// configuration. Returns the number of messages deleted. +func (s *Store) ResetSourceData(sourceID int64) (int64, error) { + var messagesDeleted int64 + + err := s.withTx(func(tx *sql.Tx) error { + // Count messages before deletion + row := tx.QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", sourceID) + if err := row.Scan(&messagesDeleted); err != nil { + return fmt.Errorf("count messages: %w", err) + } + + // Delete messages (cascades to: message_bodies, message_raw, message_recipients, + // message_labels, attachments, reactions via ON DELETE CASCADE) + if _, err := tx.Exec("DELETE FROM messages WHERE source_id = ?", sourceID); err != nil { + return fmt.Errorf("delete messages: %w", err) + } + + // Delete conversations (ON DELETE CASCADE handles conversation_participants) + if _, err := tx.Exec("DELETE FROM conversations WHERE source_id = ?", sourceID); err != nil { + return fmt.Errorf("delete conversations: %w", err) + } + + // Delete source-specific labels + if _, err := tx.Exec("DELETE FROM labels WHERE source_id = ?", sourceID); err != nil { + return fmt.Errorf("delete labels: %w", err) + } + + // Delete sync history + if _, err := tx.Exec("DELETE FROM sync_runs WHERE source_id = ?", sourceID); err != nil { + return fmt.Errorf("delete sync_runs: %w", err) + } + if _, err := tx.Exec("DELETE FROM sync_checkpoints WHERE source_id = ?", sourceID); err != nil { + return fmt.Errorf("delete sync_checkpoints: %w", err) + } + + // Reset the source's sync cursor so next sync starts fresh + if _, err := tx.Exec(` + UPDATE sources + SET sync_cursor = NULL, last_sync_at = NULL, updated_at = datetime('now') + WHERE id = ? + `, sourceID); err != nil { + return fmt.Errorf("reset source: %w", err) + } + + return nil + }) + + if err != nil { + return 0, err + } + + return messagesDeleted, nil +} diff --git a/internal/store/sync_test.go b/internal/store/sync_test.go index b5cb7c26..803d7eea 100644 --- a/internal/store/sync_test.go +++ b/internal/store/sync_test.go @@ -1,6 +1,7 @@ package store_test import ( + "fmt" "strings" "testing" "time" @@ -191,3 +192,202 @@ func TestScanSource_NullRequiredTimestamp(t *testing.T) { t.Errorf("error should mention field and NULL status, got: %s", errStr) } } + +// TestResetSourceData verifies that ResetSourceData clears all data for a source +// while preserving the source entry itself. +func TestResetSourceData(t *testing.T) { + f := storetest.New(t) + + // Add some messages to the source + f.CreateMessage("msg1") + f.CreateMessage("msg2") + f.CreateMessage("msg3") + + // Verify messages exist + var count int + err := f.Store.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", f.Source.ID).Scan(&count) + testutil.MustNoErr(t, err, "count messages before reset") + if count != 3 { + t.Fatalf("expected 3 messages before reset, got %d", count) + } + + // Set a sync cursor to verify it gets cleared + err = f.Store.UpdateSourceSyncCursor(f.Source.ID, "test-cursor-12345") + testutil.MustNoErr(t, err, "set sync cursor") + + // Reset the source data + deleted, err := f.Store.ResetSourceData(f.Source.ID) + testutil.MustNoErr(t, err, "ResetSourceData") + + if deleted != 3 { + t.Errorf("expected 3 messages deleted, got %d", deleted) + } + + // Verify messages are gone + err = f.Store.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", f.Source.ID).Scan(&count) + testutil.MustNoErr(t, err, "count messages after reset") + if count != 0 { + t.Errorf("expected 0 messages after reset, got %d", count) + } + + // Verify source still exists but sync cursor is cleared + source, err := f.Store.GetSourceByIdentifier(f.Source.Identifier) + testutil.MustNoErr(t, err, "GetSourceByIdentifier after reset") + if source == nil { + t.Fatal("source should still exist after reset") + } + if source.SyncCursor.Valid { + t.Errorf("sync cursor should be NULL after reset, got %q", source.SyncCursor.String) + } + if source.LastSyncAt.Valid { + t.Error("last_sync_at should be NULL after reset") + } +} + +// TestResetSourceData_IsolatesAccounts verifies that resetting one account +// does not affect data belonging to other accounts. +func TestResetSourceData_IsolatesAccounts(t *testing.T) { + st := testutil.NewTestStore(t) + + // Create two accounts + account1, err := st.GetOrCreateSource("gmail", "account1@example.com") + testutil.MustNoErr(t, err, "create account1") + account2, err := st.GetOrCreateSource("gmail", "account2@example.com") + testutil.MustNoErr(t, err, "create account2") + + // Create conversations for each account + conv1, err := st.EnsureConversation(account1.ID, "thread-1", "Account 1 Thread") + testutil.MustNoErr(t, err, "create conversation for account1") + conv2, err := st.EnsureConversation(account2.ID, "thread-2", "Account 2 Thread") + testutil.MustNoErr(t, err, "create conversation for account2") + + // Add messages to account 1 + for i := 0; i < 5; i++ { + _, err := st.UpsertMessage(&store.Message{ + ConversationID: conv1, + SourceID: account1.ID, + SourceMessageID: fmt.Sprintf("acct1-msg-%d", i), + MessageType: "email", + SizeEstimate: 1000, + }) + testutil.MustNoErr(t, err, "insert message for account1") + } + + // Add messages to account 2 + for i := 0; i < 3; i++ { + _, err := st.UpsertMessage(&store.Message{ + ConversationID: conv2, + SourceID: account2.ID, + SourceMessageID: fmt.Sprintf("acct2-msg-%d", i), + MessageType: "email", + SizeEstimate: 2000, + }) + testutil.MustNoErr(t, err, "insert message for account2") + } + + // Create labels for each account + _, err = st.EnsureLabel(account1.ID, "INBOX", "Inbox", "system") + testutil.MustNoErr(t, err, "create label for account1") + _, err = st.EnsureLabel(account2.ID, "INBOX", "Inbox", "system") + testutil.MustNoErr(t, err, "create label for account2") + + // Set sync cursors for both + err = st.UpdateSourceSyncCursor(account1.ID, "cursor-account1") + testutil.MustNoErr(t, err, "set cursor for account1") + err = st.UpdateSourceSyncCursor(account2.ID, "cursor-account2") + testutil.MustNoErr(t, err, "set cursor for account2") + + // Verify initial state + var count1, count2 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", account1.ID).Scan(&count1) + testutil.MustNoErr(t, err, "count account1 messages before") + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", account2.ID).Scan(&count2) + testutil.MustNoErr(t, err, "count account2 messages before") + + if count1 != 5 { + t.Fatalf("expected 5 messages for account1, got %d", count1) + } + if count2 != 3 { + t.Fatalf("expected 3 messages for account2, got %d", count2) + } + + // Reset account 1 ONLY + deleted, err := st.ResetSourceData(account1.ID) + testutil.MustNoErr(t, err, "ResetSourceData for account1") + + if deleted != 5 { + t.Errorf("expected 5 messages deleted from account1, got %d", deleted) + } + + // Verify account 1 is cleared + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", account1.ID).Scan(&count1) + testutil.MustNoErr(t, err, "count account1 messages after") + if count1 != 0 { + t.Errorf("expected 0 messages for account1 after reset, got %d", count1) + } + + // Verify account 1 conversations are cleared + var convCount1 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM conversations WHERE source_id = ?", account1.ID).Scan(&convCount1) + testutil.MustNoErr(t, err, "count account1 conversations after") + if convCount1 != 0 { + t.Errorf("expected 0 conversations for account1 after reset, got %d", convCount1) + } + + // Verify account 1 labels are cleared + var labelCount1 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM labels WHERE source_id = ?", account1.ID).Scan(&labelCount1) + testutil.MustNoErr(t, err, "count account1 labels after") + if labelCount1 != 0 { + t.Errorf("expected 0 labels for account1 after reset, got %d", labelCount1) + } + + // Verify account 1 sync cursor is cleared + src1, err := st.GetSourceByIdentifier("account1@example.com") + testutil.MustNoErr(t, err, "get account1 after reset") + if src1.SyncCursor.Valid { + t.Errorf("account1 sync cursor should be NULL, got %q", src1.SyncCursor.String) + } + + // ============================================================ + // CRITICAL: Verify account 2 is COMPLETELY UNTOUCHED + // ============================================================ + + // Account 2 messages should still exist + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", account2.ID).Scan(&count2) + testutil.MustNoErr(t, err, "count account2 messages after") + if count2 != 3 { + t.Errorf("ISOLATION FAILURE: expected 3 messages for account2, got %d", count2) + } + + // Account 2 conversations should still exist + var convCount2 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM conversations WHERE source_id = ?", account2.ID).Scan(&convCount2) + testutil.MustNoErr(t, err, "count account2 conversations after") + if convCount2 != 1 { + t.Errorf("ISOLATION FAILURE: expected 1 conversation for account2, got %d", convCount2) + } + + // Account 2 labels should still exist + var labelCount2 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM labels WHERE source_id = ?", account2.ID).Scan(&labelCount2) + testutil.MustNoErr(t, err, "count account2 labels after") + if labelCount2 != 1 { + t.Errorf("ISOLATION FAILURE: expected 1 label for account2, got %d", labelCount2) + } + + // Account 2 sync cursor should still be set + src2, err := st.GetSourceByIdentifier("account2@example.com") + testutil.MustNoErr(t, err, "get account2 after reset") + if !src2.SyncCursor.Valid || src2.SyncCursor.String != "cursor-account2" { + t.Errorf("ISOLATION FAILURE: account2 sync cursor should be 'cursor-account2', got %v", src2.SyncCursor) + } + + // Verify total message count in database + var totalMessages int + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages").Scan(&totalMessages) + testutil.MustNoErr(t, err, "count total messages") + if totalMessages != 3 { + t.Errorf("expected 3 total messages (all from account2), got %d", totalMessages) + } +} From 15af66648f3de971f6848c220b9ca36b375bb445 Mon Sep 17 00:00:00 2001 From: Rob Elkin Date: Tue, 10 Feb 2026 06:53:59 +0000 Subject: [PATCH 2/5] Optimize --clean delete performance with batching and FK bypass Major performance improvements to ResetSourceData: 1. Disable foreign key checks during bulk delete (avoids validation overhead) 2. Delete child tables explicitly (bypasses CASCADE trigger overhead) 3. Batch deletes with LIMIT 5000 (keeps transactions small) 4. Add progress callback for real-time status updates Expected speedup: 2.5 hours -> ~2-5 minutes for 125k messages. The CLI now shows per-table progress and message deletion percentage. Co-Authored-By: Claude Opus 4.5 --- .mcp.json | 18 +++ cmd/msgvault/cmd/syncfull.go | 38 +++++- internal/store/sync.go | 251 +++++++++++++++++++++++++++++------ 3 files changed, 265 insertions(+), 42 deletions(-) create mode 100644 .mcp.json diff --git a/.mcp.json b/.mcp.json new file mode 100644 index 00000000..24042f51 --- /dev/null +++ b/.mcp.json @@ -0,0 +1,18 @@ +{ + "mcpServers": { + "msgvault-dev": { + "command": "/Users/robelkin/Code/msgvault/msgvault", + "args": ["mcp"], + "env": { + "MSGVAULT_HOME": "/Users/robelkin/msgvault-dev" + } + }, + "msgvault-prod": { + "command": "/Users/robelkin/Code/msgvault/msgvault", + "args": ["mcp"], + "env": { + "MSGVAULT_HOME": "/Users/robelkin/.msgvault" + } + } + } +} diff --git a/cmd/msgvault/cmd/syncfull.go b/cmd/msgvault/cmd/syncfull.go index f8409047..bf998b8e 100644 --- a/cmd/msgvault/cmd/syncfull.go +++ b/cmd/msgvault/cmd/syncfull.go @@ -168,14 +168,42 @@ Examples: fmt.Println() } - // Perform the clean with progress - fmt.Print("Deleting messages...") - deleted, err := s.ResetSourceData(source.ID) + // Perform the clean with progress reporting + fmt.Println("Deleting local data...") + lastTable := "" + lastPrint := time.Now() + deleted, err := s.ResetSourceDataWithProgress(source.ID, func(p store.ResetProgress) { + // Throttle output to avoid spamming + if time.Since(lastPrint) < 500*time.Millisecond && p.Phase != "complete" { + return + } + lastPrint = time.Now() + + switch p.Phase { + case "counting": + fmt.Printf(" Found %d messages to delete\n", p.TotalMessages) + case "deleting": + if p.CurrentTable != lastTable { + if lastTable != "" { + fmt.Println(" done") + } + fmt.Printf(" Cleaning %s...", p.CurrentTable) + lastTable = p.CurrentTable + } + if p.CurrentTable == "messages" { + pct := float64(p.DeletedMessages) / float64(p.TotalMessages) * 100 + fmt.Printf("\r Cleaning messages... %d/%d (%.1f%%) ", p.DeletedMessages, p.TotalMessages, pct) + } + case "complete": + if lastTable != "" { + fmt.Println(" done") + } + } + }) if err != nil { - fmt.Println(" failed") + fmt.Println("\nClean failed:", err) return fmt.Errorf("reset account data: %w", err) } - fmt.Println(" done") fmt.Printf("Deleted %d messages from local database.\n\n", deleted) } diff --git a/internal/store/sync.go b/internal/store/sync.go index 3455dd49..12adb9ef 100644 --- a/internal/store/sync.go +++ b/internal/store/sync.go @@ -385,58 +385,235 @@ func (s *Store) GetSourceByIdentifier(identifier string) (*Source, error) { return source, nil } +// ResetProgress reports progress during a reset operation. +type ResetProgress struct { + Phase string // "counting", "message_bodies", "message_raw", etc. + TotalMessages int64 // Total messages to delete + DeletedMessages int64 // Messages deleted so far + CurrentTable string // Table currently being processed + RowsInBatch int64 // Rows deleted in this batch +} + +// ResetProgressFunc is called periodically during reset to report progress. +type ResetProgressFunc func(p ResetProgress) + // ResetSourceData deletes all synced data for a source while keeping the source // entry itself. This allows a clean re-sync from Gmail without losing the account // configuration. Returns the number of messages deleted. func (s *Store) ResetSourceData(sourceID int64) (int64, error) { - var messagesDeleted int64 + return s.ResetSourceDataWithProgress(sourceID, nil) +} - err := s.withTx(func(tx *sql.Tx) error { - // Count messages before deletion - row := tx.QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", sourceID) - if err := row.Scan(&messagesDeleted); err != nil { - return fmt.Errorf("count messages: %w", err) - } +// ResetSourceDataWithProgress is like ResetSourceData but reports progress via callback. +// Uses batched deletes with FK checks disabled for much better performance. +func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgressFunc) (int64, error) { + if progress == nil { + progress = func(ResetProgress) {} // no-op + } - // Delete messages (cascades to: message_bodies, message_raw, message_recipients, - // message_labels, attachments, reactions via ON DELETE CASCADE) - if _, err := tx.Exec("DELETE FROM messages WHERE source_id = ?", sourceID); err != nil { - return fmt.Errorf("delete messages: %w", err) - } + const batchSize = 5000 - // Delete conversations (ON DELETE CASCADE handles conversation_participants) - if _, err := tx.Exec("DELETE FROM conversations WHERE source_id = ?", sourceID); err != nil { - return fmt.Errorf("delete conversations: %w", err) - } + // Count messages first + var totalMessages int64 + if err := s.db.QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", sourceID).Scan(&totalMessages); err != nil { + return 0, fmt.Errorf("count messages: %w", err) + } - // Delete source-specific labels - if _, err := tx.Exec("DELETE FROM labels WHERE source_id = ?", sourceID); err != nil { - return fmt.Errorf("delete labels: %w", err) - } + progress(ResetProgress{Phase: "counting", TotalMessages: totalMessages}) - // Delete sync history - if _, err := tx.Exec("DELETE FROM sync_runs WHERE source_id = ?", sourceID); err != nil { - return fmt.Errorf("delete sync_runs: %w", err) - } - if _, err := tx.Exec("DELETE FROM sync_checkpoints WHERE source_id = ?", sourceID); err != nil { - return fmt.Errorf("delete sync_checkpoints: %w", err) + // Disable foreign keys for bulk delete performance + if _, err := s.db.Exec("PRAGMA foreign_keys = OFF"); err != nil { + return 0, fmt.Errorf("disable foreign keys: %w", err) + } + defer s.db.Exec("PRAGMA foreign_keys = ON") // Re-enable on exit + + var deletedMessages int64 + + // Delete child tables explicitly (avoiding CASCADE overhead) + // Order: children before parents + + // 1. Delete from message_bodies in batches + if err := s.deleteChildTableBatched("message_bodies", "message_id", sourceID, batchSize, func(rows int64) { + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "message_bodies", + RowsInBatch: rows, + }) + }); err != nil { + return 0, err + } + + // 2. Delete from message_raw in batches + if err := s.deleteChildTableBatched("message_raw", "message_id", sourceID, batchSize, func(rows int64) { + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "message_raw", + RowsInBatch: rows, + }) + }); err != nil { + return 0, err + } + + // 3. Delete from message_recipients in batches + if err := s.deleteChildTableBatched("message_recipients", "message_id", sourceID, batchSize, func(rows int64) { + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "message_recipients", + RowsInBatch: rows, + }) + }); err != nil { + return 0, err + } + + // 4. Delete from message_labels in batches + if err := s.deleteChildTableBatched("message_labels", "message_id", sourceID, batchSize, func(rows int64) { + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "message_labels", + RowsInBatch: rows, + }) + }); err != nil { + return 0, err + } + + // 5. Delete from attachments in batches + if err := s.deleteChildTableBatched("attachments", "message_id", sourceID, batchSize, func(rows int64) { + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "attachments", + RowsInBatch: rows, + }) + }); err != nil { + return 0, err + } + + // 6. Delete from reactions in batches + if err := s.deleteChildTableBatched("reactions", "message_id", sourceID, batchSize, func(rows int64) { + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "reactions", + RowsInBatch: rows, + }) + }); err != nil { + return 0, err + } + + // 7. Delete messages in batches (parent table) + for { + result, err := s.db.Exec(` + DELETE FROM messages WHERE id IN ( + SELECT id FROM messages WHERE source_id = ? LIMIT ? + ) + `, sourceID, batchSize) + if err != nil { + return deletedMessages, fmt.Errorf("delete messages batch: %w", err) } - // Reset the source's sync cursor so next sync starts fresh - if _, err := tx.Exec(` - UPDATE sources - SET sync_cursor = NULL, last_sync_at = NULL, updated_at = datetime('now') - WHERE id = ? - `, sourceID); err != nil { - return fmt.Errorf("reset source: %w", err) + rows, _ := result.RowsAffected() + if rows == 0 { + break } + deletedMessages += rows + + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "messages", + RowsInBatch: rows, + }) + } + + // 8. Delete conversation_participants (child of conversations) + if _, err := s.db.Exec(` + DELETE FROM conversation_participants WHERE conversation_id IN ( + SELECT id FROM conversations WHERE source_id = ? + ) + `, sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete conversation_participants: %w", err) + } + + // 9. Delete conversations + if _, err := s.db.Exec("DELETE FROM conversations WHERE source_id = ?", sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete conversations: %w", err) + } - return nil + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "conversations", }) - if err != nil { - return 0, err + // 10. Delete labels + if _, err := s.db.Exec("DELETE FROM labels WHERE source_id = ?", sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete labels: %w", err) + } + + // 11. Delete sync history + if _, err := s.db.Exec("DELETE FROM sync_runs WHERE source_id = ?", sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete sync_runs: %w", err) + } + if _, err := s.db.Exec("DELETE FROM sync_checkpoints WHERE source_id = ?", sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete sync_checkpoints: %w", err) + } + + // 12. Reset the source's sync cursor + if _, err := s.db.Exec(` + UPDATE sources + SET sync_cursor = NULL, last_sync_at = NULL, updated_at = datetime('now') + WHERE id = ? + `, sourceID); err != nil { + return deletedMessages, fmt.Errorf("reset source: %w", err) } - return messagesDeleted, nil + // Re-enable foreign keys + if _, err := s.db.Exec("PRAGMA foreign_keys = ON"); err != nil { + return deletedMessages, fmt.Errorf("re-enable foreign keys: %w", err) + } + + progress(ResetProgress{ + Phase: "complete", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + }) + + return deletedMessages, nil +} + +// deleteChildTableBatched deletes rows from a child table in batches. +// The child table must have a column that references messages.id. +func (s *Store) deleteChildTableBatched(table, fkColumn string, sourceID int64, batchSize int, onBatch func(rows int64)) error { + // Use a subquery to find message IDs for this source + query := fmt.Sprintf(` + DELETE FROM %s WHERE %s IN ( + SELECT id FROM messages WHERE source_id = ? LIMIT ? + ) + `, table, fkColumn) + + for { + result, err := s.db.Exec(query, sourceID, batchSize) + if err != nil { + return fmt.Errorf("delete from %s: %w", table, err) + } + + rows, _ := result.RowsAffected() + if rows == 0 { + break + } + onBatch(rows) + } + return nil } From 9e018f99d5f38648089d516312d6d0772c54437e Mon Sep 17 00:00:00 2001 From: Rob Elkin Date: Tue, 10 Feb 2026 06:58:58 +0000 Subject: [PATCH 3/5] Add .mcp.json to gitignore Local MCP configuration should not be tracked. Co-Authored-By: Claude Opus 4.5 --- .gitignore | 1 + .mcp.json | 18 ------------------ 2 files changed, 1 insertion(+), 18 deletions(-) delete mode 100644 .mcp.json diff --git a/.gitignore b/.gitignore index 6f78e249..70cc33c7 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ __pycache__/ # Claude Code .claude/ +.mcp.json diff --git a/.mcp.json b/.mcp.json deleted file mode 100644 index 24042f51..00000000 --- a/.mcp.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "mcpServers": { - "msgvault-dev": { - "command": "/Users/robelkin/Code/msgvault/msgvault", - "args": ["mcp"], - "env": { - "MSGVAULT_HOME": "/Users/robelkin/msgvault-dev" - } - }, - "msgvault-prod": { - "command": "/Users/robelkin/Code/msgvault/msgvault", - "args": ["mcp"], - "env": { - "MSGVAULT_HOME": "/Users/robelkin/.msgvault" - } - } - } -} From 2d822b4f1f2fc0fff44f37f10d1a54935a6e63de Mon Sep 17 00:00:00 2001 From: Rob Elkin Date: Tue, 10 Feb 2026 07:06:34 +0000 Subject: [PATCH 4/5] Fix errcheck lint: handle deferred PRAGMA error return Wrap deferred s.db.Exec in anonymous function with explicit blank identifier assignment to satisfy errcheck linter. Co-Authored-By: Claude Opus 4.5 --- internal/store/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/store/sync.go b/internal/store/sync.go index 12adb9ef..8e76c4fe 100644 --- a/internal/store/sync.go +++ b/internal/store/sync.go @@ -425,7 +425,7 @@ func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgre if _, err := s.db.Exec("PRAGMA foreign_keys = OFF"); err != nil { return 0, fmt.Errorf("disable foreign keys: %w", err) } - defer s.db.Exec("PRAGMA foreign_keys = ON") // Re-enable on exit + defer func() { _, _ = s.db.Exec("PRAGMA foreign_keys = ON") }() // Re-enable on exit var deletedMessages int64 From 5f29b431a90ee03b50fa37f39038adadc51778c5 Mon Sep 17 00:00:00 2001 From: Rob Elkin Date: Tue, 10 Feb 2026 07:57:16 +0000 Subject: [PATCH 5/5] Fix two data integrity issues in ResetSourceData 1. PRAGMA on pooled connection: PRAGMA foreign_keys = OFF was executed on *sql.DB which is a connection pool. The PRAGMA might apply to a different connection than the subsequent deletes, and a connection could return to the pool with FKs disabled. Fix: Use sql.Conn to get a dedicated connection for all operations. 2. Batching logic bug: deleteChildTableBatched repeatedly selected the same first LIMIT N message IDs because messages weren't deleted yet. After the first batch deleted those children, subsequent batches found 0 rows and exited early, leaving orphaned child rows. Fix: Delete by child table rowid with a JOIN to messages, ensuring each batch finds actual existing child rows to delete. Co-Authored-By: Claude Opus 4.5 --- internal/store/sync.go | 200 +++++++++++++++++------------------------ 1 file changed, 81 insertions(+), 119 deletions(-) diff --git a/internal/store/sync.go b/internal/store/sync.go index 8e76c4fe..902f6441 100644 --- a/internal/store/sync.go +++ b/internal/store/sync.go @@ -1,6 +1,7 @@ package store import ( + "context" "database/sql" "fmt" "time" @@ -412,6 +413,7 @@ func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgre } const batchSize = 5000 + ctx := context.Background() // Count messages first var totalMessages int64 @@ -421,98 +423,83 @@ func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgre progress(ResetProgress{Phase: "counting", TotalMessages: totalMessages}) - // Disable foreign keys for bulk delete performance - if _, err := s.db.Exec("PRAGMA foreign_keys = OFF"); err != nil { - return 0, fmt.Errorf("disable foreign keys: %w", err) - } - defer func() { _, _ = s.db.Exec("PRAGMA foreign_keys = ON") }() // Re-enable on exit - - var deletedMessages int64 - - // Delete child tables explicitly (avoiding CASCADE overhead) - // Order: children before parents - - // 1. Delete from message_bodies in batches - if err := s.deleteChildTableBatched("message_bodies", "message_id", sourceID, batchSize, func(rows int64) { - progress(ResetProgress{ - Phase: "deleting", - TotalMessages: totalMessages, - DeletedMessages: deletedMessages, - CurrentTable: "message_bodies", - RowsInBatch: rows, - }) - }); err != nil { - return 0, err + // Use a dedicated connection to ensure PRAGMA applies to all our operations. + // This is critical because *sql.DB is a connection pool - without this, + // PRAGMA foreign_keys = OFF might run on a different connection than deletes. + conn, err := s.db.Conn(ctx) + if err != nil { + return 0, fmt.Errorf("acquire connection: %w", err) } + defer conn.Close() - // 2. Delete from message_raw in batches - if err := s.deleteChildTableBatched("message_raw", "message_id", sourceID, batchSize, func(rows int64) { - progress(ResetProgress{ - Phase: "deleting", - TotalMessages: totalMessages, - DeletedMessages: deletedMessages, - CurrentTable: "message_raw", - RowsInBatch: rows, - }) - }); err != nil { - return 0, err + // Disable foreign keys for bulk delete performance on this connection + if _, err := conn.ExecContext(ctx, "PRAGMA foreign_keys = OFF"); err != nil { + return 0, fmt.Errorf("disable foreign keys: %w", err) } - // 3. Delete from message_recipients in batches - if err := s.deleteChildTableBatched("message_recipients", "message_id", sourceID, batchSize, func(rows int64) { - progress(ResetProgress{ - Phase: "deleting", - TotalMessages: totalMessages, - DeletedMessages: deletedMessages, - CurrentTable: "message_recipients", - RowsInBatch: rows, - }) - }); err != nil { - return 0, err + // Helper to delete from a child table in batches. + // Uses rowid-based deletion to ensure each batch finds actual rows to delete. + deleteChildBatched := func(table, fkColumn string, onProgress func()) error { + // Query selects child table rowids by joining to messages filtered by source. + // This ensures each iteration finds actual existing child rows. + query := fmt.Sprintf(` + DELETE FROM %s WHERE rowid IN ( + SELECT %s.rowid FROM %s + JOIN messages ON messages.id = %s.%s + WHERE messages.source_id = ? + LIMIT ? + ) + `, table, table, table, table, fkColumn) + + for { + result, err := conn.ExecContext(ctx, query, sourceID, batchSize) + if err != nil { + return fmt.Errorf("delete from %s: %w", table, err) + } + rows, _ := result.RowsAffected() + if rows == 0 { + break + } + onProgress() + } + return nil } - // 4. Delete from message_labels in batches - if err := s.deleteChildTableBatched("message_labels", "message_id", sourceID, batchSize, func(rows int64) { - progress(ResetProgress{ - Phase: "deleting", - TotalMessages: totalMessages, - DeletedMessages: deletedMessages, - CurrentTable: "message_labels", - RowsInBatch: rows, - }) - }); err != nil { - return 0, err - } + var deletedMessages int64 - // 5. Delete from attachments in batches - if err := s.deleteChildTableBatched("attachments", "message_id", sourceID, batchSize, func(rows int64) { - progress(ResetProgress{ - Phase: "deleting", - TotalMessages: totalMessages, - DeletedMessages: deletedMessages, - CurrentTable: "attachments", - RowsInBatch: rows, - }) - }); err != nil { - return 0, err - } + // Delete child tables explicitly (avoiding CASCADE overhead) + // Order: children before parents - // 6. Delete from reactions in batches - if err := s.deleteChildTableBatched("reactions", "message_id", sourceID, batchSize, func(rows int64) { - progress(ResetProgress{ - Phase: "deleting", - TotalMessages: totalMessages, - DeletedMessages: deletedMessages, - CurrentTable: "reactions", - RowsInBatch: rows, - }) - }); err != nil { - return 0, err + // Child tables of messages + childTables := []struct { + table string + fkColumn string + }{ + {"message_bodies", "message_id"}, + {"message_raw", "message_id"}, + {"message_recipients", "message_id"}, + {"message_labels", "message_id"}, + {"attachments", "message_id"}, + {"reactions", "message_id"}, + } + + for _, ct := range childTables { + tableName := ct.table + if err := deleteChildBatched(ct.table, ct.fkColumn, func() { + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: tableName, + }) + }); err != nil { + return 0, err + } } - // 7. Delete messages in batches (parent table) + // Delete messages in batches (parent table) for { - result, err := s.db.Exec(` + result, err := conn.ExecContext(ctx, ` DELETE FROM messages WHERE id IN ( SELECT id FROM messages WHERE source_id = ? LIMIT ? ) @@ -536,8 +523,8 @@ func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgre }) } - // 8. Delete conversation_participants (child of conversations) - if _, err := s.db.Exec(` + // Delete conversation_participants (child of conversations) + if _, err := conn.ExecContext(ctx, ` DELETE FROM conversation_participants WHERE conversation_id IN ( SELECT id FROM conversations WHERE source_id = ? ) @@ -545,8 +532,8 @@ func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgre return deletedMessages, fmt.Errorf("delete conversation_participants: %w", err) } - // 9. Delete conversations - if _, err := s.db.Exec("DELETE FROM conversations WHERE source_id = ?", sourceID); err != nil { + // Delete conversations + if _, err := conn.ExecContext(ctx, "DELETE FROM conversations WHERE source_id = ?", sourceID); err != nil { return deletedMessages, fmt.Errorf("delete conversations: %w", err) } @@ -557,21 +544,21 @@ func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgre CurrentTable: "conversations", }) - // 10. Delete labels - if _, err := s.db.Exec("DELETE FROM labels WHERE source_id = ?", sourceID); err != nil { + // Delete labels + if _, err := conn.ExecContext(ctx, "DELETE FROM labels WHERE source_id = ?", sourceID); err != nil { return deletedMessages, fmt.Errorf("delete labels: %w", err) } - // 11. Delete sync history - if _, err := s.db.Exec("DELETE FROM sync_runs WHERE source_id = ?", sourceID); err != nil { + // Delete sync history + if _, err := conn.ExecContext(ctx, "DELETE FROM sync_runs WHERE source_id = ?", sourceID); err != nil { return deletedMessages, fmt.Errorf("delete sync_runs: %w", err) } - if _, err := s.db.Exec("DELETE FROM sync_checkpoints WHERE source_id = ?", sourceID); err != nil { + if _, err := conn.ExecContext(ctx, "DELETE FROM sync_checkpoints WHERE source_id = ?", sourceID); err != nil { return deletedMessages, fmt.Errorf("delete sync_checkpoints: %w", err) } - // 12. Reset the source's sync cursor - if _, err := s.db.Exec(` + // Reset the source's sync cursor + if _, err := conn.ExecContext(ctx, ` UPDATE sources SET sync_cursor = NULL, last_sync_at = NULL, updated_at = datetime('now') WHERE id = ? @@ -579,8 +566,8 @@ func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgre return deletedMessages, fmt.Errorf("reset source: %w", err) } - // Re-enable foreign keys - if _, err := s.db.Exec("PRAGMA foreign_keys = ON"); err != nil { + // Re-enable foreign keys on this connection before returning it to pool + if _, err := conn.ExecContext(ctx, "PRAGMA foreign_keys = ON"); err != nil { return deletedMessages, fmt.Errorf("re-enable foreign keys: %w", err) } @@ -592,28 +579,3 @@ func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgre return deletedMessages, nil } - -// deleteChildTableBatched deletes rows from a child table in batches. -// The child table must have a column that references messages.id. -func (s *Store) deleteChildTableBatched(table, fkColumn string, sourceID int64, batchSize int, onBatch func(rows int64)) error { - // Use a subquery to find message IDs for this source - query := fmt.Sprintf(` - DELETE FROM %s WHERE %s IN ( - SELECT id FROM messages WHERE source_id = ? LIMIT ? - ) - `, table, fkColumn) - - for { - result, err := s.db.Exec(query, sourceID, batchSize) - if err != nil { - return fmt.Errorf("delete from %s: %w", table, err) - } - - rows, _ := result.RowsAffected() - if rows == 0 { - break - } - onBatch(rows) - } - return nil -}