Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions cmd/msgvault/cmd/build_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/spf13/cobra"
"github.com/wesm/msgvault/internal/config"
"github.com/wesm/msgvault/internal/query"
"github.com/wesm/msgvault/internal/store"
)

var fullRebuild bool
Expand All @@ -27,10 +28,17 @@ var fullRebuild bool
// files (_last_sync.json, parquet directories) can corrupt the cache.
var buildCacheMu sync.Mutex

// cacheSchemaVersion tracks the Parquet schema layout. Bump this whenever
// columns are added/removed/renamed in the COPY queries below so that
// incremental builds automatically trigger a full rebuild instead of
// producing Parquet files with mismatched schemas.
const cacheSchemaVersion = 3 // v3: schema migration adds phone_number etc. to existing DBs; force Parquet rebuild

// syncState tracks the last exported message ID for incremental updates.
type syncState struct {
LastMessageID int64 `json:"last_message_id"`
LastSyncAt time.Time `json:"last_sync_at"`
SchemaVersion int `json:"schema_version,omitempty"`
}

var buildCacheCmd = &cobra.Command{
Expand Down Expand Up @@ -62,6 +70,20 @@ Use --full-rebuild to recreate all cache files from scratch.`,
return fmt.Errorf("database not found: %s\nRun 'msgvault init-db' first", dbPath)
}

// Ensure schema is up to date before building cache.
// Legacy databases may be missing columns (e.g. attachment_count,
// sender_id, message_type, phone_number) that the export queries
// reference. Running migrations first adds them.
s, err := store.Open(dbPath)
if err != nil {
return fmt.Errorf("open database: %w", err)
}
if err := s.InitSchema(); err != nil {
s.Close()
return fmt.Errorf("init schema: %w", err)
}
s.Close()

result, err := buildCache(dbPath, analyticsDir, fullRebuild)
if err != nil {
return err
Expand Down Expand Up @@ -101,8 +123,16 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
if data, err := os.ReadFile(stateFile); err == nil {
var state syncState
if json.Unmarshal(data, &state) == nil {
lastMessageID = state.LastMessageID
fmt.Printf("Incremental export from message_id > %d\n", lastMessageID)
if state.SchemaVersion != cacheSchemaVersion {
// Schema has changed — force a full rebuild.
fmt.Printf("Cache schema version mismatch (have v%d, need v%d). Forcing full rebuild.\n",
state.SchemaVersion, cacheSchemaVersion)
fullRebuild = true
lastMessageID = 0
} else {
lastMessageID = state.LastMessageID
fmt.Printf("Incremental export from message_id > %d\n", lastMessageID)
}
}
}
}
Expand Down Expand Up @@ -231,7 +261,10 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
m.sent_at,
m.size_estimate,
m.has_attachments,
COALESCE(TRY_CAST(m.attachment_count AS INTEGER), 0) as attachment_count,
m.deleted_from_source_at,
m.sender_id,
COALESCE(TRY_CAST(m.message_type AS VARCHAR), '') as message_type,
CAST(EXTRACT(YEAR FROM m.sent_at) AS INTEGER) as year,
CAST(EXTRACT(MONTH FROM m.sent_at) AS INTEGER) as month
FROM sqlite_db.messages m
Expand Down Expand Up @@ -321,7 +354,8 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
id,
COALESCE(TRY_CAST(email_address AS VARCHAR), '') as email_address,
COALESCE(TRY_CAST(domain AS VARCHAR), '') as domain,
COALESCE(TRY_CAST(display_name AS VARCHAR), '') as display_name
COALESCE(TRY_CAST(display_name AS VARCHAR), '') as display_name,
COALESCE(TRY_CAST(phone_number AS VARCHAR), '') as phone_number
FROM sqlite_db.participants
) TO '%s/participants.parquet' (
FORMAT PARQUET,
Expand Down Expand Up @@ -372,7 +406,8 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
COPY (
SELECT
id,
COALESCE(TRY_CAST(source_conversation_id AS VARCHAR), '') as source_conversation_id
COALESCE(TRY_CAST(source_conversation_id AS VARCHAR), '') as source_conversation_id,
COALESCE(TRY_CAST(title AS VARCHAR), '') as title
FROM sqlite_db.conversations
) TO '%s/conversations.parquet' (
FORMAT PARQUET,
Expand All @@ -391,10 +426,11 @@ func buildCache(dbPath, analyticsDir string, fullRebuild bool) (*buildResult, er
exportedCount = 0
}

// Save sync state
// Save sync state with schema version for compatibility detection.
state := syncState{
LastMessageID: maxID,
LastSyncAt: time.Now(),
SchemaVersion: cacheSchemaVersion,
}
stateData, _ := json.Marshal(state)
if err := os.WriteFile(stateFile, stateData, 0644); err != nil {
Expand Down Expand Up @@ -592,15 +628,15 @@ func setupSQLiteSource(duckDB *sql.DB, dbPath string) (cleanup func(), err error
query string
typeOverrides string // DuckDB types parameter for read_csv_auto (empty = infer all)
}{
{"messages", "SELECT id, source_id, source_message_id, conversation_id, subject, snippet, sent_at, size_estimate, has_attachments, deleted_from_source_at FROM messages WHERE sent_at IS NOT NULL",
{"messages", "SELECT id, source_id, source_message_id, conversation_id, subject, snippet, sent_at, size_estimate, has_attachments, attachment_count, deleted_from_source_at, sender_id, message_type FROM messages WHERE sent_at IS NOT NULL",
"types={'sent_at': 'TIMESTAMP', 'deleted_from_source_at': 'TIMESTAMP'}"},
{"message_recipients", "SELECT message_id, participant_id, recipient_type, display_name FROM message_recipients", ""},
{"message_labels", "SELECT message_id, label_id FROM message_labels", ""},
{"attachments", "SELECT message_id, size, filename FROM attachments", ""},
{"participants", "SELECT id, email_address, domain, display_name FROM participants", ""},
{"participants", "SELECT id, email_address, domain, display_name, phone_number FROM participants", ""},
{"labels", "SELECT id, name FROM labels", ""},
{"sources", "SELECT id, identifier FROM sources", ""},
{"conversations", "SELECT id, source_conversation_id FROM conversations", ""},
{"conversations", "SELECT id, source_conversation_id, title FROM conversations", ""},
}

for _, t := range tables {
Expand Down
34 changes: 21 additions & 13 deletions cmd/msgvault/cmd/build_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,19 @@ func setupTestSQLite(t *testing.T) (string, func()) {
received_at TIMESTAMP,
size_estimate INTEGER,
has_attachments BOOLEAN DEFAULT FALSE,
attachment_count INTEGER DEFAULT 0,
deleted_from_source_at TIMESTAMP,
sender_id INTEGER,
message_type TEXT NOT NULL DEFAULT 'email',
UNIQUE(source_id, source_message_id)
);

CREATE TABLE participants (
id INTEGER PRIMARY KEY,
email_address TEXT NOT NULL UNIQUE,
domain TEXT,
display_name TEXT
display_name TEXT,
phone_number TEXT
);

CREATE TABLE message_recipients (
Expand Down Expand Up @@ -1128,13 +1132,13 @@ func TestBuildCache_EmptyDatabase(t *testing.T) {
db, _ := sql.Open("sqlite3", dbPath)
_, _ = db.Exec(`
CREATE TABLE sources (id INTEGER PRIMARY KEY, identifier TEXT);
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP);
CREATE TABLE participants (id INTEGER PRIMARY KEY, email_address TEXT, domain TEXT, display_name TEXT);
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP, attachment_count INTEGER DEFAULT 0, sender_id INTEGER, message_type TEXT NOT NULL DEFAULT 'email');
CREATE TABLE participants (id INTEGER PRIMARY KEY, email_address TEXT, domain TEXT, display_name TEXT, phone_number TEXT);
CREATE TABLE message_recipients (message_id INTEGER, participant_id INTEGER, recipient_type TEXT, display_name TEXT);
CREATE TABLE labels (id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE message_labels (message_id INTEGER, label_id INTEGER);
CREATE TABLE attachments (message_id INTEGER, size INTEGER, filename TEXT);
CREATE TABLE conversations (id INTEGER PRIMARY KEY, source_conversation_id TEXT);
CREATE TABLE conversations (id INTEGER PRIMARY KEY, source_conversation_id TEXT, title TEXT);
`)
db.Close()

Expand Down Expand Up @@ -1328,13 +1332,13 @@ func BenchmarkBuildCache(b *testing.B) {
// Create schema
_, _ = db.Exec(`
CREATE TABLE sources (id INTEGER PRIMARY KEY, identifier TEXT);
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP);
CREATE TABLE participants (id INTEGER PRIMARY KEY, email_address TEXT UNIQUE, domain TEXT, display_name TEXT);
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP, attachment_count INTEGER DEFAULT 0, sender_id INTEGER, message_type TEXT NOT NULL DEFAULT 'email');
CREATE TABLE participants (id INTEGER PRIMARY KEY, email_address TEXT UNIQUE, domain TEXT, display_name TEXT, phone_number TEXT);
CREATE TABLE message_recipients (message_id INTEGER, participant_id INTEGER, recipient_type TEXT, display_name TEXT);
CREATE TABLE labels (id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE message_labels (message_id INTEGER, label_id INTEGER);
CREATE TABLE attachments (message_id INTEGER, size INTEGER, filename TEXT);
CREATE TABLE conversations (id INTEGER PRIMARY KEY, source_conversation_id TEXT);
CREATE TABLE conversations (id INTEGER PRIMARY KEY, source_conversation_id TEXT, title TEXT);
INSERT INTO sources VALUES (1, 'test@gmail.com');
INSERT INTO labels VALUES (1, 'INBOX'), (2, 'Work');
`)
Expand Down Expand Up @@ -1418,14 +1422,18 @@ func setupTestSQLiteEmpty(t *testing.T) (string, func()) {
received_at TIMESTAMP,
size_estimate INTEGER,
has_attachments BOOLEAN DEFAULT FALSE,
attachment_count INTEGER DEFAULT 0,
deleted_from_source_at TIMESTAMP,
sender_id INTEGER,
message_type TEXT NOT NULL DEFAULT 'email',
UNIQUE(source_id, source_message_id)
);
CREATE TABLE participants (
id INTEGER PRIMARY KEY,
email_address TEXT NOT NULL UNIQUE,
domain TEXT,
display_name TEXT
display_name TEXT,
phone_number TEXT
);
CREATE TABLE message_recipients (
id INTEGER PRIMARY KEY,
Expand Down Expand Up @@ -1757,17 +1765,17 @@ func BenchmarkBuildCacheIncremental(b *testing.B) {
// Create schema and initial data (10000 messages)
_, _ = db.Exec(`
CREATE TABLE sources (id INTEGER PRIMARY KEY, identifier TEXT);
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP);
CREATE TABLE participants (id INTEGER PRIMARY KEY, email_address TEXT UNIQUE, domain TEXT, display_name TEXT);
CREATE TABLE messages (id INTEGER PRIMARY KEY, source_id INTEGER, source_message_id TEXT, sent_at TIMESTAMP, size_estimate INTEGER, has_attachments BOOLEAN, subject TEXT, snippet TEXT, conversation_id INTEGER, deleted_from_source_at TIMESTAMP, attachment_count INTEGER DEFAULT 0, sender_id INTEGER, message_type TEXT NOT NULL DEFAULT 'email');
CREATE TABLE participants (id INTEGER PRIMARY KEY, email_address TEXT UNIQUE, domain TEXT, display_name TEXT, phone_number TEXT);
CREATE TABLE message_recipients (message_id INTEGER, participant_id INTEGER, recipient_type TEXT, display_name TEXT);
CREATE TABLE labels (id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE message_labels (message_id INTEGER, label_id INTEGER);
CREATE TABLE attachments (message_id INTEGER, size INTEGER, filename TEXT);
CREATE TABLE conversations (id INTEGER PRIMARY KEY, source_conversation_id TEXT);
CREATE TABLE conversations (id INTEGER PRIMARY KEY, source_conversation_id TEXT, title TEXT);
INSERT INTO sources VALUES (1, 'test@gmail.com');
INSERT INTO labels VALUES (1, 'INBOX');
INSERT INTO participants VALUES (1, 'alice@example.com', 'example.com', 'Alice');
INSERT INTO participants VALUES (2, 'bob@example.com', 'example.com', 'Bob');
INSERT INTO participants VALUES (1, 'alice@example.com', 'example.com', 'Alice', NULL);
INSERT INTO participants VALUES (2, 'bob@example.com', 'example.com', 'Bob', NULL);
`)

// Insert conversations to match messages
Expand Down
4 changes: 4 additions & 0 deletions cmd/msgvault/cmd/export_attachments.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func runExportAttachments(cmd *cobra.Command, args []string) error {
}
defer s.Close()

if err := s.InitSchema(); err != nil {
return fmt.Errorf("init schema: %w", err)
}

engine := query.NewSQLiteEngine(s.DB())

// Resolve message ID — try numeric first, fallback to Gmail ID
Expand Down
4 changes: 4 additions & 0 deletions cmd/msgvault/cmd/export_eml.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func runExportEML(cmd *cobra.Command, messageRef, outputPath string) error {
}
defer s.Close()

if err := s.InitSchema(); err != nil {
return fmt.Errorf("init schema: %w", err)
}

engine := query.NewSQLiteEngine(s.DB())

resolved, err := resolveMessage(engine, cmd, messageRef)
Expand Down
Loading