diff --git a/.gitignore b/.gitignore index 6f78e249..70cc33c7 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,4 @@ __pycache__/ # Claude Code .claude/ +.mcp.json diff --git a/cmd/msgvault/cmd/syncfull.go b/cmd/msgvault/cmd/syncfull.go index 0faca7b8..bf998b8e 100644 --- a/cmd/msgvault/cmd/syncfull.go +++ b/cmd/msgvault/cmd/syncfull.go @@ -21,6 +21,8 @@ var ( syncBefore string syncAfter string syncLimit int + syncClean bool + syncCleanYes bool ) var syncFullCmd = &cobra.Command{ @@ -37,18 +39,30 @@ Date filters: --after 2024-01-01 Only messages on or after this date --before 2024-12-31 Only messages before this date +Clean sync: + --clean Delete all local data for the account and re-sync + from scratch. Use this to reset staging/deletion + state or recover from a corrupted local database. + Requires confirmation (use --yes to skip). + Examples: msgvault sync-full # Sync all accounts msgvault sync-full you@gmail.com msgvault sync-full you@gmail.com --after 2024-01-01 msgvault sync-full you@gmail.com --query "from:someone@example.com" - msgvault sync-full you@gmail.com --noresume # Force fresh sync`, + msgvault sync-full you@gmail.com --noresume # Force fresh sync + msgvault sync-full you@gmail.com --clean # Delete local data and re-sync + msgvault sync-full you@gmail.com --clean --yes # Skip confirmation`, Args: cobra.MaximumNArgs(1), RunE: func(cmd *cobra.Command, args []string) error { if syncLimit < 0 { return fmt.Errorf("--limit must be a non-negative number") } + if syncClean && len(args) == 0 { + return fmt.Errorf("--clean requires specifying an account email") + } + // Validate config if cfg.OAuth.ClientSecrets == "" { return errOAuthNotConfigured() @@ -100,15 +114,99 @@ Examples: ctx, cancel := context.WithCancel(cmd.Context()) defer cancel() - // Handle Ctrl+C gracefully + // Handle Ctrl+C gracefully (first = graceful, second = force exit) sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan - fmt.Println("\nInterrupted. Saving checkpoint...") + fmt.Println("\nInterrupted. Saving checkpoint... (press Ctrl+C again to force quit)") cancel() + // Wait for second signal to force exit + <-sigChan + fmt.Println("\nForce quit.") + os.Exit(1) }() + // Handle --clean: delete all local data for the account before syncing + if syncClean { + email := emails[0] // Already validated that exactly one email is specified + source, err := s.GetSourceByIdentifier(email) + if err != nil { + return fmt.Errorf("lookup account: %w", err) + } + if source == nil { + return fmt.Errorf("account %s not found in database", email) + } + + // Count what will be deleted + fmt.Printf("Preparing to clean local data for %s...\n", email) + var msgCount, convCount, labelCount int64 + _ = s.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", source.ID).Scan(&msgCount) + _ = s.DB().QueryRow("SELECT COUNT(*) FROM conversations WHERE source_id = ?", source.ID).Scan(&convCount) + _ = s.DB().QueryRow("SELECT COUNT(*) FROM labels WHERE source_id = ?", source.ID).Scan(&labelCount) + + fmt.Println() + fmt.Println("This will permanently delete from the LOCAL database:") + fmt.Printf(" • %d messages\n", msgCount) + fmt.Printf(" • %d conversations\n", convCount) + fmt.Printf(" • %d labels\n", labelCount) + fmt.Printf(" • All sync history and checkpoints\n") + fmt.Println() + fmt.Println("Note: This does NOT delete anything from Gmail.") + fmt.Println(" After cleaning, a full re-sync will download all messages again.") + fmt.Println() + + // Require confirmation unless --yes is provided + if !syncCleanYes { + fmt.Print("Proceed with clean? [y/N]: ") + var response string + _, _ = fmt.Scanln(&response) + if response != "y" && response != "Y" { + fmt.Println("Cancelled.") + return nil + } + fmt.Println() + } + + // Perform the clean with progress reporting + fmt.Println("Deleting local data...") + lastTable := "" + lastPrint := time.Now() + deleted, err := s.ResetSourceDataWithProgress(source.ID, func(p store.ResetProgress) { + // Throttle output to avoid spamming + if time.Since(lastPrint) < 500*time.Millisecond && p.Phase != "complete" { + return + } + lastPrint = time.Now() + + switch p.Phase { + case "counting": + fmt.Printf(" Found %d messages to delete\n", p.TotalMessages) + case "deleting": + if p.CurrentTable != lastTable { + if lastTable != "" { + fmt.Println(" done") + } + fmt.Printf(" Cleaning %s...", p.CurrentTable) + lastTable = p.CurrentTable + } + if p.CurrentTable == "messages" { + pct := float64(p.DeletedMessages) / float64(p.TotalMessages) * 100 + fmt.Printf("\r Cleaning messages... %d/%d (%.1f%%) ", p.DeletedMessages, p.TotalMessages, pct) + } + case "complete": + if lastTable != "" { + fmt.Println(" done") + } + } + }) + if err != nil { + fmt.Println("\nClean failed:", err) + return fmt.Errorf("reset account data: %w", err) + } + fmt.Printf("Deleted %d messages from local database.\n\n", deleted) + } + var syncErrors []string for _, email := range emails { if ctx.Err() != nil { @@ -332,5 +430,7 @@ func init() { syncFullCmd.Flags().StringVar(&syncBefore, "before", "", "Only messages before this date (YYYY-MM-DD)") syncFullCmd.Flags().StringVar(&syncAfter, "after", "", "Only messages after this date (YYYY-MM-DD)") syncFullCmd.Flags().IntVar(&syncLimit, "limit", 0, "Limit number of messages (for testing)") + syncFullCmd.Flags().BoolVar(&syncClean, "clean", false, "Delete all local data for the account and re-sync from scratch") + syncFullCmd.Flags().BoolVarP(&syncCleanYes, "yes", "y", false, "Skip confirmation prompt for --clean") rootCmd.AddCommand(syncFullCmd) } diff --git a/internal/store/sync.go b/internal/store/sync.go index 3a6701d9..902f6441 100644 --- a/internal/store/sync.go +++ b/internal/store/sync.go @@ -1,6 +1,7 @@ package store import ( + "context" "database/sql" "fmt" "time" @@ -384,3 +385,197 @@ func (s *Store) GetSourceByIdentifier(identifier string) (*Source, error) { return source, nil } + +// ResetProgress reports progress during a reset operation. +type ResetProgress struct { + Phase string // "counting", "message_bodies", "message_raw", etc. + TotalMessages int64 // Total messages to delete + DeletedMessages int64 // Messages deleted so far + CurrentTable string // Table currently being processed + RowsInBatch int64 // Rows deleted in this batch +} + +// ResetProgressFunc is called periodically during reset to report progress. +type ResetProgressFunc func(p ResetProgress) + +// ResetSourceData deletes all synced data for a source while keeping the source +// entry itself. This allows a clean re-sync from Gmail without losing the account +// configuration. Returns the number of messages deleted. +func (s *Store) ResetSourceData(sourceID int64) (int64, error) { + return s.ResetSourceDataWithProgress(sourceID, nil) +} + +// ResetSourceDataWithProgress is like ResetSourceData but reports progress via callback. +// Uses batched deletes with FK checks disabled for much better performance. +func (s *Store) ResetSourceDataWithProgress(sourceID int64, progress ResetProgressFunc) (int64, error) { + if progress == nil { + progress = func(ResetProgress) {} // no-op + } + + const batchSize = 5000 + ctx := context.Background() + + // Count messages first + var totalMessages int64 + if err := s.db.QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", sourceID).Scan(&totalMessages); err != nil { + return 0, fmt.Errorf("count messages: %w", err) + } + + progress(ResetProgress{Phase: "counting", TotalMessages: totalMessages}) + + // Use a dedicated connection to ensure PRAGMA applies to all our operations. + // This is critical because *sql.DB is a connection pool - without this, + // PRAGMA foreign_keys = OFF might run on a different connection than deletes. + conn, err := s.db.Conn(ctx) + if err != nil { + return 0, fmt.Errorf("acquire connection: %w", err) + } + defer conn.Close() + + // Disable foreign keys for bulk delete performance on this connection + if _, err := conn.ExecContext(ctx, "PRAGMA foreign_keys = OFF"); err != nil { + return 0, fmt.Errorf("disable foreign keys: %w", err) + } + + // Helper to delete from a child table in batches. + // Uses rowid-based deletion to ensure each batch finds actual rows to delete. + deleteChildBatched := func(table, fkColumn string, onProgress func()) error { + // Query selects child table rowids by joining to messages filtered by source. + // This ensures each iteration finds actual existing child rows. + query := fmt.Sprintf(` + DELETE FROM %s WHERE rowid IN ( + SELECT %s.rowid FROM %s + JOIN messages ON messages.id = %s.%s + WHERE messages.source_id = ? + LIMIT ? + ) + `, table, table, table, table, fkColumn) + + for { + result, err := conn.ExecContext(ctx, query, sourceID, batchSize) + if err != nil { + return fmt.Errorf("delete from %s: %w", table, err) + } + rows, _ := result.RowsAffected() + if rows == 0 { + break + } + onProgress() + } + return nil + } + + var deletedMessages int64 + + // Delete child tables explicitly (avoiding CASCADE overhead) + // Order: children before parents + + // Child tables of messages + childTables := []struct { + table string + fkColumn string + }{ + {"message_bodies", "message_id"}, + {"message_raw", "message_id"}, + {"message_recipients", "message_id"}, + {"message_labels", "message_id"}, + {"attachments", "message_id"}, + {"reactions", "message_id"}, + } + + for _, ct := range childTables { + tableName := ct.table + if err := deleteChildBatched(ct.table, ct.fkColumn, func() { + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: tableName, + }) + }); err != nil { + return 0, err + } + } + + // Delete messages in batches (parent table) + for { + result, err := conn.ExecContext(ctx, ` + DELETE FROM messages WHERE id IN ( + SELECT id FROM messages WHERE source_id = ? LIMIT ? + ) + `, sourceID, batchSize) + if err != nil { + return deletedMessages, fmt.Errorf("delete messages batch: %w", err) + } + + rows, _ := result.RowsAffected() + if rows == 0 { + break + } + deletedMessages += rows + + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "messages", + RowsInBatch: rows, + }) + } + + // Delete conversation_participants (child of conversations) + if _, err := conn.ExecContext(ctx, ` + DELETE FROM conversation_participants WHERE conversation_id IN ( + SELECT id FROM conversations WHERE source_id = ? + ) + `, sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete conversation_participants: %w", err) + } + + // Delete conversations + if _, err := conn.ExecContext(ctx, "DELETE FROM conversations WHERE source_id = ?", sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete conversations: %w", err) + } + + progress(ResetProgress{ + Phase: "deleting", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + CurrentTable: "conversations", + }) + + // Delete labels + if _, err := conn.ExecContext(ctx, "DELETE FROM labels WHERE source_id = ?", sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete labels: %w", err) + } + + // Delete sync history + if _, err := conn.ExecContext(ctx, "DELETE FROM sync_runs WHERE source_id = ?", sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete sync_runs: %w", err) + } + if _, err := conn.ExecContext(ctx, "DELETE FROM sync_checkpoints WHERE source_id = ?", sourceID); err != nil { + return deletedMessages, fmt.Errorf("delete sync_checkpoints: %w", err) + } + + // Reset the source's sync cursor + if _, err := conn.ExecContext(ctx, ` + UPDATE sources + SET sync_cursor = NULL, last_sync_at = NULL, updated_at = datetime('now') + WHERE id = ? + `, sourceID); err != nil { + return deletedMessages, fmt.Errorf("reset source: %w", err) + } + + // Re-enable foreign keys on this connection before returning it to pool + if _, err := conn.ExecContext(ctx, "PRAGMA foreign_keys = ON"); err != nil { + return deletedMessages, fmt.Errorf("re-enable foreign keys: %w", err) + } + + progress(ResetProgress{ + Phase: "complete", + TotalMessages: totalMessages, + DeletedMessages: deletedMessages, + }) + + return deletedMessages, nil +} diff --git a/internal/store/sync_test.go b/internal/store/sync_test.go index b5cb7c26..803d7eea 100644 --- a/internal/store/sync_test.go +++ b/internal/store/sync_test.go @@ -1,6 +1,7 @@ package store_test import ( + "fmt" "strings" "testing" "time" @@ -191,3 +192,202 @@ func TestScanSource_NullRequiredTimestamp(t *testing.T) { t.Errorf("error should mention field and NULL status, got: %s", errStr) } } + +// TestResetSourceData verifies that ResetSourceData clears all data for a source +// while preserving the source entry itself. +func TestResetSourceData(t *testing.T) { + f := storetest.New(t) + + // Add some messages to the source + f.CreateMessage("msg1") + f.CreateMessage("msg2") + f.CreateMessage("msg3") + + // Verify messages exist + var count int + err := f.Store.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", f.Source.ID).Scan(&count) + testutil.MustNoErr(t, err, "count messages before reset") + if count != 3 { + t.Fatalf("expected 3 messages before reset, got %d", count) + } + + // Set a sync cursor to verify it gets cleared + err = f.Store.UpdateSourceSyncCursor(f.Source.ID, "test-cursor-12345") + testutil.MustNoErr(t, err, "set sync cursor") + + // Reset the source data + deleted, err := f.Store.ResetSourceData(f.Source.ID) + testutil.MustNoErr(t, err, "ResetSourceData") + + if deleted != 3 { + t.Errorf("expected 3 messages deleted, got %d", deleted) + } + + // Verify messages are gone + err = f.Store.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", f.Source.ID).Scan(&count) + testutil.MustNoErr(t, err, "count messages after reset") + if count != 0 { + t.Errorf("expected 0 messages after reset, got %d", count) + } + + // Verify source still exists but sync cursor is cleared + source, err := f.Store.GetSourceByIdentifier(f.Source.Identifier) + testutil.MustNoErr(t, err, "GetSourceByIdentifier after reset") + if source == nil { + t.Fatal("source should still exist after reset") + } + if source.SyncCursor.Valid { + t.Errorf("sync cursor should be NULL after reset, got %q", source.SyncCursor.String) + } + if source.LastSyncAt.Valid { + t.Error("last_sync_at should be NULL after reset") + } +} + +// TestResetSourceData_IsolatesAccounts verifies that resetting one account +// does not affect data belonging to other accounts. +func TestResetSourceData_IsolatesAccounts(t *testing.T) { + st := testutil.NewTestStore(t) + + // Create two accounts + account1, err := st.GetOrCreateSource("gmail", "account1@example.com") + testutil.MustNoErr(t, err, "create account1") + account2, err := st.GetOrCreateSource("gmail", "account2@example.com") + testutil.MustNoErr(t, err, "create account2") + + // Create conversations for each account + conv1, err := st.EnsureConversation(account1.ID, "thread-1", "Account 1 Thread") + testutil.MustNoErr(t, err, "create conversation for account1") + conv2, err := st.EnsureConversation(account2.ID, "thread-2", "Account 2 Thread") + testutil.MustNoErr(t, err, "create conversation for account2") + + // Add messages to account 1 + for i := 0; i < 5; i++ { + _, err := st.UpsertMessage(&store.Message{ + ConversationID: conv1, + SourceID: account1.ID, + SourceMessageID: fmt.Sprintf("acct1-msg-%d", i), + MessageType: "email", + SizeEstimate: 1000, + }) + testutil.MustNoErr(t, err, "insert message for account1") + } + + // Add messages to account 2 + for i := 0; i < 3; i++ { + _, err := st.UpsertMessage(&store.Message{ + ConversationID: conv2, + SourceID: account2.ID, + SourceMessageID: fmt.Sprintf("acct2-msg-%d", i), + MessageType: "email", + SizeEstimate: 2000, + }) + testutil.MustNoErr(t, err, "insert message for account2") + } + + // Create labels for each account + _, err = st.EnsureLabel(account1.ID, "INBOX", "Inbox", "system") + testutil.MustNoErr(t, err, "create label for account1") + _, err = st.EnsureLabel(account2.ID, "INBOX", "Inbox", "system") + testutil.MustNoErr(t, err, "create label for account2") + + // Set sync cursors for both + err = st.UpdateSourceSyncCursor(account1.ID, "cursor-account1") + testutil.MustNoErr(t, err, "set cursor for account1") + err = st.UpdateSourceSyncCursor(account2.ID, "cursor-account2") + testutil.MustNoErr(t, err, "set cursor for account2") + + // Verify initial state + var count1, count2 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", account1.ID).Scan(&count1) + testutil.MustNoErr(t, err, "count account1 messages before") + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", account2.ID).Scan(&count2) + testutil.MustNoErr(t, err, "count account2 messages before") + + if count1 != 5 { + t.Fatalf("expected 5 messages for account1, got %d", count1) + } + if count2 != 3 { + t.Fatalf("expected 3 messages for account2, got %d", count2) + } + + // Reset account 1 ONLY + deleted, err := st.ResetSourceData(account1.ID) + testutil.MustNoErr(t, err, "ResetSourceData for account1") + + if deleted != 5 { + t.Errorf("expected 5 messages deleted from account1, got %d", deleted) + } + + // Verify account 1 is cleared + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", account1.ID).Scan(&count1) + testutil.MustNoErr(t, err, "count account1 messages after") + if count1 != 0 { + t.Errorf("expected 0 messages for account1 after reset, got %d", count1) + } + + // Verify account 1 conversations are cleared + var convCount1 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM conversations WHERE source_id = ?", account1.ID).Scan(&convCount1) + testutil.MustNoErr(t, err, "count account1 conversations after") + if convCount1 != 0 { + t.Errorf("expected 0 conversations for account1 after reset, got %d", convCount1) + } + + // Verify account 1 labels are cleared + var labelCount1 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM labels WHERE source_id = ?", account1.ID).Scan(&labelCount1) + testutil.MustNoErr(t, err, "count account1 labels after") + if labelCount1 != 0 { + t.Errorf("expected 0 labels for account1 after reset, got %d", labelCount1) + } + + // Verify account 1 sync cursor is cleared + src1, err := st.GetSourceByIdentifier("account1@example.com") + testutil.MustNoErr(t, err, "get account1 after reset") + if src1.SyncCursor.Valid { + t.Errorf("account1 sync cursor should be NULL, got %q", src1.SyncCursor.String) + } + + // ============================================================ + // CRITICAL: Verify account 2 is COMPLETELY UNTOUCHED + // ============================================================ + + // Account 2 messages should still exist + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages WHERE source_id = ?", account2.ID).Scan(&count2) + testutil.MustNoErr(t, err, "count account2 messages after") + if count2 != 3 { + t.Errorf("ISOLATION FAILURE: expected 3 messages for account2, got %d", count2) + } + + // Account 2 conversations should still exist + var convCount2 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM conversations WHERE source_id = ?", account2.ID).Scan(&convCount2) + testutil.MustNoErr(t, err, "count account2 conversations after") + if convCount2 != 1 { + t.Errorf("ISOLATION FAILURE: expected 1 conversation for account2, got %d", convCount2) + } + + // Account 2 labels should still exist + var labelCount2 int + err = st.DB().QueryRow("SELECT COUNT(*) FROM labels WHERE source_id = ?", account2.ID).Scan(&labelCount2) + testutil.MustNoErr(t, err, "count account2 labels after") + if labelCount2 != 1 { + t.Errorf("ISOLATION FAILURE: expected 1 label for account2, got %d", labelCount2) + } + + // Account 2 sync cursor should still be set + src2, err := st.GetSourceByIdentifier("account2@example.com") + testutil.MustNoErr(t, err, "get account2 after reset") + if !src2.SyncCursor.Valid || src2.SyncCursor.String != "cursor-account2" { + t.Errorf("ISOLATION FAILURE: account2 sync cursor should be 'cursor-account2', got %v", src2.SyncCursor) + } + + // Verify total message count in database + var totalMessages int + err = st.DB().QueryRow("SELECT COUNT(*) FROM messages").Scan(&totalMessages) + testutil.MustNoErr(t, err, "count total messages") + if totalMessages != 3 { + t.Errorf("expected 3 total messages (all from account2), got %d", totalMessages) + } +}