diff --git a/.roborev.toml b/.roborev.toml index adbd2f11..74110055 100644 --- a/.roborev.toml +++ b/.roborev.toml @@ -14,6 +14,12 @@ HTTP remote defaults, plaintext key display in interactive CLI, enabled=true override on account creation, and page-aligned pagination are documented design decisions — see code comments at each site. +Remote engine query string reconstruction in buildSearchQueryString is +intentionally simplified — phrase quoting edge cases are acceptable since +the search parser on the server re-parses the query. Empty search queries +sending q= is expected; the server returns empty results gracefully. +TimeGranularity defaults to "month" when unspecified, which is correct. + This is a single-user personal tool with no privilege separation, no setuid, no shared directories, and no multi-tenant access. Do not flag symlink-following, local file overwrites, or similar CWE patterns that diff --git a/CLAUDE.md b/CLAUDE.md index df7f54d1..3fe56301 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -53,10 +53,14 @@ make lint # Run linter # TUI and analytics ./msgvault tui # Launch TUI ./msgvault tui --account you@gmail.com # Filter by account +./msgvault tui --local # Force local (override remote config) ./msgvault build-cache # Build Parquet cache ./msgvault build-cache --full-rebuild # Full rebuild ./msgvault stats # Show archive stats +# Daemon mode (NAS/server deployment) +./msgvault serve # Start HTTP API + scheduled syncs + # Maintenance ./msgvault repair-encoding # Fix UTF-8 encoding issues ``` diff --git a/README.md b/README.md index 36928ba1..7ddd080d 100644 --- a/README.md +++ b/README.md @@ -80,13 +80,18 @@ msgvault tui | `add-account EMAIL` | Authorize a Gmail account (use `--headless` for servers) | | `sync-full EMAIL` | Full sync (`--limit N`, `--after`/`--before` for date ranges) | | `sync EMAIL` | Sync only new/changed messages | -| `tui` | Launch the interactive TUI (`--account` to filter) | +| `tui` | Launch the interactive TUI (`--account` to filter, `--local` to force local) | | `search QUERY` | Search messages (`--json` for machine output) | +| `show-message ID` | View full message details (`--json` for machine output) | | `mcp` | Start the MCP server for AI assistant integration | +| `serve` | Run daemon with scheduled sync and HTTP API for remote TUI | | `stats` | Show archive statistics | +| `list-accounts` | List synced email accounts | | `verify EMAIL` | Verify archive integrity against Gmail | | `export-eml` | Export a message as `.eml` | | `build-cache` | Rebuild the Parquet analytics cache | +| `update` | Update msgvault to the latest version | +| `setup` | Interactive first-run configuration wizard | | `repair-encoding` | Fix UTF-8 encoding issues | | `list-senders` / `list-domains` / `list-labels` | Explore metadata | @@ -111,6 +116,30 @@ See the [Configuration Guide](https://msgvault.io/configuration/) for all option msgvault includes an MCP server that lets AI assistants search, analyze, and read your archived messages. Connect it to Claude Desktop or any MCP-capable agent and query your full message history conversationally. See the [MCP documentation](https://msgvault.io/usage/chat/) for setup instructions. +## Daemon Mode (NAS/Server) + +Run msgvault as a long-running daemon for scheduled syncs and remote access: + +```bash +msgvault serve +``` + +Configure scheduled syncs in `config.toml`: + +```toml +[[accounts]] +email = "you@gmail.com" +schedule = "0 2 * * *" # 2am daily (cron) +enabled = true + +[server] +api_port = 8080 +bind_addr = "0.0.0.0" +api_key = "your-secret-key" +``` + +The TUI can connect to a remote server by configuring `[remote].url`. Use `--local` to force local database when remote is configured. See [docs/api.md](docs/api.md) for the HTTP API reference. + ## Documentation - [Setup Guide](https://msgvault.io/guides/oauth-setup/): OAuth, first sync, headless servers diff --git a/cmd/msgvault/cmd/quickstart.md b/cmd/msgvault/cmd/quickstart.md index 77053bee..a149c4f0 100644 --- a/cmd/msgvault/cmd/quickstart.md +++ b/cmd/msgvault/cmd/quickstart.md @@ -75,7 +75,7 @@ Fetches only changes since the last sync using the Gmail History API. Much faster than a full sync. Requires a prior full sync. ```bash -msgvault sync-incremental user@gmail.com +msgvault sync user@gmail.com ``` If Gmail's history has expired (~7 days), it will suggest running a full sync. @@ -260,8 +260,25 @@ msgvault tui # Filter by account msgvault tui --account user@gmail.com + +# Force local database (override remote config) +msgvault tui --local ``` +### Remote mode + +When `[remote].url` is configured in `config.toml`, the TUI connects to a remote +msgvault server instead of the local database. This is useful for accessing an +archive on a NAS or server from another machine. + +```toml +[remote] +url = "http://nas.local:8080" +api_key = "your-api-key" +``` + +In remote mode, deletion staging and attachment export are disabled for safety. + ### TUI keybindings | Key | Action | @@ -291,7 +308,7 @@ msgvault tui --account user@gmail.com 2. **Search**: `msgvault search --json` — find relevant messages. 3. **Read details**: `msgvault show-message --json` — get full message content. 4. **Analyze**: `list-senders`, `list-domains`, `list-labels` with `--json` for patterns. -5. **Sync new mail**: `msgvault sync-incremental user@gmail.com` if archive is stale. +5. **Sync new mail**: `msgvault sync user@gmail.com` if archive is stale. ## Tips diff --git a/cmd/msgvault/cmd/serve.go b/cmd/msgvault/cmd/serve.go index 63bf9cde..dff5116b 100644 --- a/cmd/msgvault/cmd/serve.go +++ b/cmd/msgvault/cmd/serve.go @@ -16,6 +16,7 @@ import ( "github.com/wesm/msgvault/internal/api" "github.com/wesm/msgvault/internal/gmail" "github.com/wesm/msgvault/internal/oauth" + "github.com/wesm/msgvault/internal/query" "github.com/wesm/msgvault/internal/scheduler" "github.com/wesm/msgvault/internal/store" "github.com/wesm/msgvault/internal/sync" @@ -82,6 +83,17 @@ func runServe(cmd *cobra.Command, args []string) error { return fmt.Errorf("init schema: %w", err) } + // Create query engine for TUI aggregate support + analyticsDir := cfg.AnalyticsDir() + engine, err := query.NewDuckDBEngine(analyticsDir, dbPath, nil) + if err != nil { + logger.Warn("query engine not available - aggregate endpoints will return 503", "error", err) + // Continue without engine - basic endpoints still work + } + if engine != nil { + defer engine.Close() + } + // Create OAuth manager oauthMgr, err := oauth.NewManager(cfg.OAuth.ClientSecrets, cfg.TokensDir(), logger) if err != nil { @@ -122,7 +134,13 @@ func runServe(cmd *cobra.Command, args []string) error { schedAdapter := &schedulerAdapter{scheduler: sched} // Create and start API server - apiServer := api.NewServer(cfg, storeAdapter, schedAdapter, logger) + apiServer := api.NewServerWithOptions(api.ServerOptions{ + Config: cfg, + Store: storeAdapter, + Engine: engine, + Scheduler: schedAdapter, + Logger: logger, + }) // Start API server in goroutine serverErr := make(chan error, 1) diff --git a/cmd/msgvault/cmd/tui.go b/cmd/msgvault/cmd/tui.go index 2733a5d7..1a151aeb 100644 --- a/cmd/msgvault/cmd/tui.go +++ b/cmd/msgvault/cmd/tui.go @@ -9,6 +9,7 @@ import ( tea "github.com/charmbracelet/bubbletea" "github.com/spf13/cobra" "github.com/wesm/msgvault/internal/query" + "github.com/wesm/msgvault/internal/remote" "github.com/wesm/msgvault/internal/store" "github.com/wesm/msgvault/internal/tui" ) @@ -16,6 +17,7 @@ import ( var forceSQL bool var skipCacheBuild bool var noSQLiteScanner bool +var forceLocalTUI bool var tuiCmd = &cobra.Command{ Use: "tui", @@ -48,76 +50,103 @@ Selection & Deletion: Performance: For large archives (100k+ messages), the TUI uses Parquet files for fast - aggregation queries. Run 'msgvault-sync build-parquet' to generate them. - Use --force-sql to bypass Parquet and query SQLite directly (slow).`, + aggregation queries. Run 'msgvault build-cache' to generate them. + Use --force-sql to bypass Parquet and query SQLite directly (slow). + +Remote Mode: + When [remote].url is configured, the TUI connects to a remote msgvault server. + Use --local to force local database. Deletion and export are disabled in remote mode.`, RunE: func(cmd *cobra.Command, args []string) error { - // Open database - dbPath := cfg.DatabaseDSN() - s, err := store.Open(dbPath) - if err != nil { - return fmt.Errorf("open database: %w", err) - } - defer s.Close() + var engine query.Engine + var isRemote bool + + // Check for remote mode (unless --local flag is set) + if cfg.Remote.URL != "" && !forceLocalTUI { + // Remote mode - connect to remote msgvault server + remoteCfg := remote.Config{ + URL: cfg.Remote.URL, + APIKey: cfg.Remote.APIKey, + AllowInsecure: cfg.Remote.AllowInsecure, + } + remoteEngine, err := remote.NewEngine(remoteCfg) + if err != nil { + return fmt.Errorf("connect to remote: %w", err) + } + defer remoteEngine.Close() + engine = remoteEngine + isRemote = true + fmt.Printf("Connected to remote: %s\n", cfg.Remote.URL) + } else { + // Local mode - use local database + dbPath := cfg.DatabaseDSN() + s, err := store.Open(dbPath) + if err != nil { + return fmt.Errorf("open database: %w", err) + } + defer s.Close() - // Ensure schema is up to date - if err := s.InitSchema(); err != nil { - return fmt.Errorf("init schema: %w", err) - } + // Ensure schema is up to date + if err := s.InitSchema(); err != nil { + return fmt.Errorf("init schema: %w", err) + } - // Build FTS index in background — TUI uses DuckDB/Parquet for - // aggregates and only needs FTS for deep search (Tab to switch). - if s.NeedsFTSBackfill() { - go func() { - _, _ = s.BackfillFTS(nil) - }() - } + // Build FTS index in background — TUI uses DuckDB/Parquet for + // aggregates and only needs FTS for deep search (Tab to switch). + if s.NeedsFTSBackfill() { + go func() { + _, _ = s.BackfillFTS(nil) + }() + } - analyticsDir := cfg.AnalyticsDir() + analyticsDir := cfg.AnalyticsDir() + + // Check if cache needs to be built/updated (unless forcing SQL or skipping) + if !forceSQL && !skipCacheBuild { + needsBuild, reason := cacheNeedsBuild(dbPath, analyticsDir) + if needsBuild { + fmt.Printf("Building analytics cache (%s)...\n", reason) + result, err := buildCache(dbPath, analyticsDir, true) + if err != nil { + fmt.Fprintf(os.Stderr, "Warning: Failed to build cache: %v\n", err) + fmt.Fprintf(os.Stderr, "Falling back to SQLite (may be slow for large archives)\n") + } else if !result.Skipped { + fmt.Printf("Cached %d messages for fast queries.\n", result.ExportedCount) + } + } + } - // Check if cache needs to be built/updated (unless forcing SQL or skipping) - if !forceSQL && !skipCacheBuild { - needsBuild, reason := cacheNeedsBuild(dbPath, analyticsDir) - if needsBuild { - fmt.Printf("Building analytics cache (%s)...\n", reason) - result, err := buildCache(dbPath, analyticsDir, true) + // Determine query engine to use + if !forceSQL && query.HasCompleteParquetData(analyticsDir) { + // Use DuckDB for fast Parquet queries + var duckOpts query.DuckDBOptions + if noSQLiteScanner { + duckOpts.DisableSQLiteScanner = true + } + duckEngine, err := query.NewDuckDBEngine(analyticsDir, dbPath, s.DB(), duckOpts) if err != nil { - fmt.Fprintf(os.Stderr, "Warning: Failed to build cache: %v\n", err) + fmt.Fprintf(os.Stderr, "Warning: Failed to open Parquet engine: %v\n", err) fmt.Fprintf(os.Stderr, "Falling back to SQLite (may be slow for large archives)\n") - } else if !result.Skipped { - fmt.Printf("Cached %d messages for fast queries.\n", result.ExportedCount) + engine = query.NewSQLiteEngine(s.DB()) + } else { + engine = duckEngine + defer duckEngine.Close() } - } - } - - // Determine query engine to use - var engine query.Engine - - if !forceSQL && query.HasCompleteParquetData(analyticsDir) { - // Use DuckDB for fast Parquet queries - var duckOpts query.DuckDBOptions - if noSQLiteScanner { - duckOpts.DisableSQLiteScanner = true - } - duckEngine, err := query.NewDuckDBEngine(analyticsDir, dbPath, s.DB(), duckOpts) - if err != nil { - fmt.Fprintf(os.Stderr, "Warning: Failed to open Parquet engine: %v\n", err) - fmt.Fprintf(os.Stderr, "Falling back to SQLite (may be slow for large archives)\n") - engine = query.NewSQLiteEngine(s.DB()) } else { - engine = duckEngine - defer duckEngine.Close() - } - } else { - // Use SQLite directly - if !forceSQL { - fmt.Fprintf(os.Stderr, "Note: No cache data available, using SQLite (slow for large archives)\n") - fmt.Fprintf(os.Stderr, "Run 'msgvault build-cache' to enable fast queries.\n") + // Use SQLite directly + if !forceSQL { + fmt.Fprintf(os.Stderr, "Note: No cache data available, using SQLite (slow for large archives)\n") + fmt.Fprintf(os.Stderr, "Run 'msgvault build-cache' to enable fast queries.\n") + } + engine = query.NewSQLiteEngine(s.DB()) } - engine = query.NewSQLiteEngine(s.DB()) } // Create and run TUI - model := tui.New(engine, tui.Options{DataDir: cfg.Data.DataDir, Version: Version}) + model := tui.New(engine, tui.Options{ + DataDir: cfg.Data.DataDir, + Version: Version, + IsRemote: isRemote, + }) p := tea.NewProgram(model, tea.WithAltScreen()) if _, err := p.Run(); err != nil { @@ -204,5 +233,6 @@ func init() { tuiCmd.Flags().BoolVar(&forceSQL, "force-sql", false, "Force SQLite queries instead of Parquet (slow for large archives)") tuiCmd.Flags().BoolVar(&skipCacheBuild, "no-cache-build", false, "Skip automatic cache build/update") tuiCmd.Flags().BoolVar(&noSQLiteScanner, "no-sqlite-scanner", false, "Disable DuckDB sqlite_scanner extension (use direct SQLite fallback)") + tuiCmd.Flags().BoolVar(&forceLocalTUI, "local", false, "Force local database (override remote config)") _ = tuiCmd.Flags().MarkHidden("no-sqlite-scanner") } diff --git a/docs/api.md b/docs/api.md index a167b558..c0366252 100644 --- a/docs/api.md +++ b/docs/api.md @@ -303,6 +303,212 @@ Returns the current scheduler status and all scheduled accounts. The `running` field at the top level reflects the actual scheduler lifecycle state (true after `Start()`, false after `Stop()`). Per-account `running` indicates whether a sync is currently in progress for that account. +--- + +## TUI Support Endpoints + +These endpoints support the remote TUI feature, allowing `msgvault tui` to work against a remote server. + +### Get Aggregates + +``` +GET /api/v1/aggregates +``` + +Returns aggregate data grouped by a specified view type (senders, domains, labels, etc.). + +**Query Parameters:** +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `view_type` | string | required | `senders`, `sender_names`, `recipients`, `recipient_names`, `domains`, `labels`, `time` | +| `sort` | string | `count` | `count`, `size`, `attachment_size`, `name` | +| `direction` | string | `desc` | `asc`, `desc` | +| `limit` | int | 100 | Maximum rows to return | +| `time_granularity` | string | `month` | `year`, `month`, `day` (for time view) | +| `source_id` | int | - | Filter by account | +| `attachments_only` | bool | false | Only messages with attachments | +| `hide_deleted` | bool | false | Exclude deleted messages | +| `search_query` | string | - | Filter by search query | + +**Response:** +```json +{ + "view_type": "senders", + "rows": [ + { + "key": "alice@example.com", + "count": 150, + "total_size": 2048000, + "attachment_size": 512000, + "attachment_count": 25, + "total_unique": 1 + } + ] +} +``` + +--- + +### Get Sub-Aggregates + +``` +GET /api/v1/aggregates/sub +``` + +Returns aggregates for a filtered subset of messages (drill-down navigation). + +**Query Parameters:** +All parameters from `/aggregates`, plus filter parameters: +| Parameter | Type | Description | +|-----------|------|-------------| +| `sender` | string | Filter by sender email | +| `sender_name` | string | Filter by sender name | +| `recipient` | string | Filter by recipient email | +| `recipient_name` | string | Filter by recipient name | +| `domain` | string | Filter by domain | +| `label` | string | Filter by label | +| `time_period` | string | Filter by time period | + +--- + +### Get Filtered Messages + +``` +GET /api/v1/messages/filter +``` + +Returns a filtered list of messages with pagination. + +**Query Parameters:** +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `sender` | string | - | Filter by sender | +| `domain` | string | - | Filter by domain | +| `label` | string | - | Filter by label | +| `conversation_id` | int | - | Filter by thread (for thread view) | +| `offset` | int | 0 | Pagination offset | +| `limit` | int | 100 | Pagination limit (max 500) | +| `sort` | string | `date` | `date`, `size`, `subject` | +| `direction` | string | `desc` | `asc`, `desc` | + +**Response:** +```json +{ + "total": 150, + "offset": 0, + "limit": 100, + "messages": [ + { + "id": 12345, + "subject": "Meeting Tomorrow", + "from": "sender@example.com", + "to": ["recipient@example.com"], + "sent_at": "2024-01-15T10:30:00Z", + "snippet": "Hi, just wanted to confirm...", + "labels": ["INBOX"], + "has_attachments": false, + "size_bytes": 2048 + } + ] +} +``` + +--- + +### Get Total Stats + +``` +GET /api/v1/stats/total +``` + +Returns detailed statistics with optional filters. + +**Query Parameters:** +| Parameter | Type | Description | +|-----------|------|-------------| +| `source_id` | int | Filter by account | +| `attachments_only` | bool | Only messages with attachments | +| `hide_deleted` | bool | Exclude deleted messages | +| `search_query` | string | Filter by search query | + +**Response:** +```json +{ + "message_count": 125000, + "total_size": 5242880000, + "attachment_count": 8500, + "attachment_size": 1048576000, + "label_count": 35, + "account_count": 2 +} +``` + +--- + +### Fast Search + +``` +GET /api/v1/search/fast +``` + +Fast metadata search (subject, sender, recipient). Does not search message body. + +**Query Parameters:** +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `q` | string | Yes | Search query | +| `offset` | int | No | Pagination offset | +| `limit` | int | No | Pagination limit (default 100) | + +Plus all filter parameters from `/messages/filter`. + +**Response:** +```json +{ + "query": "invoice", + "messages": [...], + "total_count": 42, + "stats": { + "message_count": 42, + "total_size": 1048576, + "attachment_count": 5, + "attachment_size": 524288, + "label_count": 3, + "account_count": 1 + } +} +``` + +--- + +### Deep Search + +``` +GET /api/v1/search/deep +``` + +Full-text search including message body (uses FTS5). + +**Query Parameters:** +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `q` | string | Yes | Search query | +| `offset` | int | No | Pagination offset | +| `limit` | int | No | Pagination limit (default 100) | + +**Response:** +```json +{ + "query": "project proposal", + "messages": [...], + "total_count": 15, + "offset": 0, + "limit": 100 +} +``` + +--- + ## Error Responses All errors return a consistent JSON format: diff --git a/internal/api/handlers.go b/internal/api/handlers.go index 885a9b7a..c027e0bc 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -15,7 +15,9 @@ import ( "github.com/go-chi/chi/v5" "github.com/wesm/msgvault/internal/config" "github.com/wesm/msgvault/internal/fileutil" + "github.com/wesm/msgvault/internal/query" "github.com/wesm/msgvault/internal/scheduler" + "github.com/wesm/msgvault/internal/search" "github.com/wesm/msgvault/internal/store" "golang.org/x/oauth2" ) @@ -583,3 +585,528 @@ func (s *Server) handleAddAccount(w http.ResponseWriter, r *http.Request) { "message": "Account added for " + req.Email, }) } + +// ============================================================================ +// TUI Aggregate Endpoints +// ============================================================================ + +// AggregateResponse represents aggregate query results. +type AggregateResponse struct { + ViewType string `json:"view_type"` + Rows []AggregateRowJSON `json:"rows"` +} + +// AggregateRowJSON represents a single aggregate row in JSON format. +type AggregateRowJSON struct { + Key string `json:"key"` + Count int64 `json:"count"` + TotalSize int64 `json:"total_size"` + AttachmentSize int64 `json:"attachment_size"` + AttachmentCount int64 `json:"attachment_count"` + TotalUnique int64 `json:"total_unique"` +} + +// TotalStatsResponse represents detailed stats with filters. +type TotalStatsResponse struct { + MessageCount int64 `json:"message_count"` + TotalSize int64 `json:"total_size"` + AttachmentCount int64 `json:"attachment_count"` + AttachmentSize int64 `json:"attachment_size"` + LabelCount int64 `json:"label_count"` + AccountCount int64 `json:"account_count"` +} + +// SearchFastResponse represents fast search results with stats. +type SearchFastResponse struct { + Query string `json:"query"` + Messages []MessageSummary `json:"messages"` + TotalCount int64 `json:"total_count"` + Stats *TotalStatsResponse `json:"stats,omitempty"` +} + +// parseViewType parses a view type string into query.ViewType. +func parseViewType(s string) (query.ViewType, bool) { + switch strings.ToLower(s) { + case "senders": + return query.ViewSenders, true + case "sender_names": + return query.ViewSenderNames, true + case "recipients": + return query.ViewRecipients, true + case "recipient_names": + return query.ViewRecipientNames, true + case "domains": + return query.ViewDomains, true + case "labels": + return query.ViewLabels, true + case "time": + return query.ViewTime, true + default: + return query.ViewSenders, false + } +} + +// viewTypeString converts a query.ViewType to its API string representation. +func viewTypeString(v query.ViewType) string { + switch v { + case query.ViewSenders: + return "senders" + case query.ViewSenderNames: + return "sender_names" + case query.ViewRecipients: + return "recipients" + case query.ViewRecipientNames: + return "recipient_names" + case query.ViewDomains: + return "domains" + case query.ViewLabels: + return "labels" + case query.ViewTime: + return "time" + default: + return "unknown" + } +} + +// parseSortField parses a sort field string into query.SortField. +func parseSortField(s string) query.SortField { + switch strings.ToLower(s) { + case "count": + return query.SortByCount + case "size": + return query.SortBySize + case "attachment_size": + return query.SortByAttachmentSize + case "name": + return query.SortByName + default: + return query.SortByCount + } +} + +// parseSortDirection parses a direction string into query.SortDirection. +func parseSortDirection(s string) query.SortDirection { + if strings.ToLower(s) == "asc" { + return query.SortAsc + } + return query.SortDesc +} + +// parseTimeGranularity parses a granularity string into query.TimeGranularity. +func parseTimeGranularity(s string) query.TimeGranularity { + switch strings.ToLower(s) { + case "year": + return query.TimeYear + case "day": + return query.TimeDay + default: + return query.TimeMonth + } +} + +// parseAggregateOptions extracts common aggregate options from query parameters. +func parseAggregateOptions(r *http.Request) query.AggregateOptions { + opts := query.DefaultAggregateOptions() + + if v := r.URL.Query().Get("sort"); v != "" { + opts.SortField = parseSortField(v) + } + if v := r.URL.Query().Get("direction"); v != "" { + opts.SortDirection = parseSortDirection(v) + } + if v := r.URL.Query().Get("limit"); v != "" { + if limit, err := strconv.Atoi(v); err == nil && limit > 0 { + opts.Limit = limit + } + } + if v := r.URL.Query().Get("time_granularity"); v != "" { + opts.TimeGranularity = parseTimeGranularity(v) + } + if v := r.URL.Query().Get("source_id"); v != "" { + if id, err := strconv.ParseInt(v, 10, 64); err == nil { + opts.SourceID = &id + } + } + if r.URL.Query().Get("attachments_only") == "true" { + opts.WithAttachmentsOnly = true + } + if r.URL.Query().Get("hide_deleted") == "true" { + opts.HideDeletedFromSource = true + } + if v := r.URL.Query().Get("search_query"); v != "" { + opts.SearchQuery = v + } + + return opts +} + +// parseMessageFilter extracts filter parameters from query parameters. +func parseMessageFilter(r *http.Request) query.MessageFilter { + var filter query.MessageFilter + + filter.Sender = r.URL.Query().Get("sender") + filter.SenderName = r.URL.Query().Get("sender_name") + filter.Recipient = r.URL.Query().Get("recipient") + filter.RecipientName = r.URL.Query().Get("recipient_name") + filter.Domain = r.URL.Query().Get("domain") + filter.Label = r.URL.Query().Get("label") + + if v := r.URL.Query().Get("time_period"); v != "" { + filter.TimeRange.Period = v + } + if v := r.URL.Query().Get("time_granularity"); v != "" { + filter.TimeRange.Granularity = parseTimeGranularity(v) + } + if v := r.URL.Query().Get("conversation_id"); v != "" { + if id, err := strconv.ParseInt(v, 10, 64); err == nil { + filter.ConversationID = &id + } + } + if v := r.URL.Query().Get("source_id"); v != "" { + if id, err := strconv.ParseInt(v, 10, 64); err == nil { + filter.SourceID = &id + } + } + if r.URL.Query().Get("attachments_only") == "true" { + filter.WithAttachmentsOnly = true + } + if r.URL.Query().Get("hide_deleted") == "true" { + filter.HideDeletedFromSource = true + } + + // Pagination + if v := r.URL.Query().Get("offset"); v != "" { + if offset, err := strconv.Atoi(v); err == nil && offset >= 0 { + filter.Pagination.Offset = offset + } + } + if v := r.URL.Query().Get("limit"); v != "" { + if limit, err := strconv.Atoi(v); err == nil && limit > 0 { + filter.Pagination.Limit = limit + } + } + if filter.Pagination.Limit == 0 { + filter.Pagination.Limit = 500 // Default limit for message lists + } + + // Sorting + if v := r.URL.Query().Get("sort"); v != "" { + switch strings.ToLower(v) { + case "date": + filter.Sorting.Field = query.MessageSortByDate + case "size": + filter.Sorting.Field = query.MessageSortBySize + case "subject": + filter.Sorting.Field = query.MessageSortBySubject + } + } + if v := r.URL.Query().Get("direction"); v != "" { + filter.Sorting.Direction = parseSortDirection(v) + } + + return filter +} + +// toAggregateRowJSON converts query.AggregateRow to JSON format. +func toAggregateRowJSON(row query.AggregateRow) AggregateRowJSON { + return AggregateRowJSON{ + Key: row.Key, + Count: row.Count, + TotalSize: row.TotalSize, + AttachmentSize: row.AttachmentSize, + AttachmentCount: row.AttachmentCount, + TotalUnique: row.TotalUnique, + } +} + +// toTotalStatsResponse converts query.TotalStats to JSON format. +func toTotalStatsResponse(stats *query.TotalStats) *TotalStatsResponse { + if stats == nil { + return nil + } + return &TotalStatsResponse{ + MessageCount: stats.MessageCount, + TotalSize: stats.TotalSize, + AttachmentCount: stats.AttachmentCount, + AttachmentSize: stats.AttachmentSize, + LabelCount: stats.LabelCount, + AccountCount: stats.AccountCount, + } +} + +// toMessageSummaryFromQuery converts query.MessageSummary to API MessageSummary. +func toMessageSummaryFromQuery(m query.MessageSummary) MessageSummary { + labels := m.Labels + if labels == nil { + labels = []string{} + } + return MessageSummary{ + ID: m.ID, + Subject: m.Subject, + From: m.FromEmail, + To: []string{}, // Query summary doesn't include recipients + SentAt: m.SentAt.UTC().Format(time.RFC3339), + Snippet: m.Snippet, + Labels: labels, + HasAttach: m.HasAttachments, + SizeBytes: m.SizeEstimate, + } +} + +// handleAggregates returns aggregate data for a view type. +// GET /api/v1/aggregates?view_type=senders&sort=count&direction=desc&limit=100 +func (s *Server) handleAggregates(w http.ResponseWriter, r *http.Request) { + if s.engine == nil { + writeError(w, http.StatusServiceUnavailable, "engine_unavailable", "Query engine not available") + return + } + + viewTypeStr := r.URL.Query().Get("view_type") + if viewTypeStr == "" { + viewTypeStr = "senders" // Default + } + viewType, ok := parseViewType(viewTypeStr) + if !ok { + writeError(w, http.StatusBadRequest, "invalid_view_type", + "Invalid view_type. Must be one of: senders, sender_names, recipients, recipient_names, domains, labels, time") + return + } + + opts := parseAggregateOptions(r) + + rows, err := s.engine.Aggregate(r.Context(), viewType, opts) + if err != nil { + s.logger.Error("aggregate query failed", "view_type", viewTypeStr, "error", err) + writeError(w, http.StatusInternalServerError, "internal_error", "Aggregate query failed") + return + } + + jsonRows := make([]AggregateRowJSON, len(rows)) + for i, row := range rows { + jsonRows[i] = toAggregateRowJSON(row) + } + + writeJSON(w, http.StatusOK, AggregateResponse{ + ViewType: viewTypeString(viewType), + Rows: jsonRows, + }) +} + +// handleSubAggregates returns sub-aggregate data after drill-down. +// GET /api/v1/aggregates/sub?view_type=labels&sender=foo@example.com +func (s *Server) handleSubAggregates(w http.ResponseWriter, r *http.Request) { + if s.engine == nil { + writeError(w, http.StatusServiceUnavailable, "engine_unavailable", "Query engine not available") + return + } + + viewTypeStr := r.URL.Query().Get("view_type") + if viewTypeStr == "" { + writeError(w, http.StatusBadRequest, "missing_view_type", "view_type parameter is required") + return + } + viewType, ok := parseViewType(viewTypeStr) + if !ok { + writeError(w, http.StatusBadRequest, "invalid_view_type", + "Invalid view_type. Must be one of: senders, sender_names, recipients, recipient_names, domains, labels, time") + return + } + + filter := parseMessageFilter(r) + opts := parseAggregateOptions(r) + + rows, err := s.engine.SubAggregate(r.Context(), filter, viewType, opts) + if err != nil { + s.logger.Error("sub-aggregate query failed", "view_type", viewTypeStr, "error", err) + writeError(w, http.StatusInternalServerError, "internal_error", "Sub-aggregate query failed") + return + } + + jsonRows := make([]AggregateRowJSON, len(rows)) + for i, row := range rows { + jsonRows[i] = toAggregateRowJSON(row) + } + + writeJSON(w, http.StatusOK, AggregateResponse{ + ViewType: viewTypeString(viewType), + Rows: jsonRows, + }) +} + +// handleFilteredMessages returns a filtered list of messages. +// GET /api/v1/messages/filter?sender=foo@example.com&offset=0&limit=500 +func (s *Server) handleFilteredMessages(w http.ResponseWriter, r *http.Request) { + if s.engine == nil { + writeError(w, http.StatusServiceUnavailable, "engine_unavailable", "Query engine not available") + return + } + + filter := parseMessageFilter(r) + + messages, err := s.engine.ListMessages(r.Context(), filter) + if err != nil { + s.logger.Error("filtered messages query failed", "error", err) + writeError(w, http.StatusInternalServerError, "internal_error", "Message query failed") + return + } + + summaries := make([]MessageSummary, len(messages)) + for i, m := range messages { + summaries[i] = toMessageSummaryFromQuery(m) + } + + writeJSON(w, http.StatusOK, map[string]interface{}{ + "total": len(summaries), // Note: This is the returned count, not total matching + "offset": filter.Pagination.Offset, + "limit": filter.Pagination.Limit, + "messages": summaries, + }) +} + +// handleTotalStats returns detailed stats with optional filters. +// GET /api/v1/stats/total?source_id=1&attachments_only=true +func (s *Server) handleTotalStats(w http.ResponseWriter, r *http.Request) { + if s.engine == nil { + writeError(w, http.StatusServiceUnavailable, "engine_unavailable", "Query engine not available") + return + } + + var opts query.StatsOptions + + if v := r.URL.Query().Get("source_id"); v != "" { + if id, err := strconv.ParseInt(v, 10, 64); err == nil { + opts.SourceID = &id + } + } + if r.URL.Query().Get("attachments_only") == "true" { + opts.WithAttachmentsOnly = true + } + if r.URL.Query().Get("hide_deleted") == "true" { + opts.HideDeletedFromSource = true + } + if v := r.URL.Query().Get("search_query"); v != "" { + opts.SearchQuery = v + } + if v := r.URL.Query().Get("group_by"); v != "" { + if viewType, ok := parseViewType(v); ok { + opts.GroupBy = viewType + } + } + + stats, err := s.engine.GetTotalStats(r.Context(), opts) + if err != nil { + s.logger.Error("total stats query failed", "error", err) + writeError(w, http.StatusInternalServerError, "internal_error", "Stats query failed") + return + } + + writeJSON(w, http.StatusOK, toTotalStatsResponse(stats)) +} + +// handleFastSearch performs fast metadata search (subject, sender, recipient). +// GET /api/v1/search/fast?q=invoice&offset=0&limit=100 +func (s *Server) handleFastSearch(w http.ResponseWriter, r *http.Request) { + if s.engine == nil { + writeError(w, http.StatusServiceUnavailable, "engine_unavailable", "Query engine not available") + return + } + + queryStr := r.URL.Query().Get("q") + if queryStr == "" { + writeError(w, http.StatusBadRequest, "missing_query", "Query parameter 'q' is required") + return + } + + filter := parseMessageFilter(r) + + // Get view type for stats grouping (optional, defaults to senders) + var statsGroupBy query.ViewType + if v := r.URL.Query().Get("view_type"); v != "" { + var ok bool + statsGroupBy, ok = parseViewType(v) + if !ok { + writeError(w, http.StatusBadRequest, "invalid_view_type", + "Invalid view_type. Must be one of: senders, sender_names, recipients, recipient_names, domains, labels, time") + return + } + } + + offset := filter.Pagination.Offset + limit := filter.Pagination.Limit + if limit == 0 || limit > 500 { + limit = 100 + } + + q := search.Parse(queryStr) + + result, err := s.engine.SearchFastWithStats(r.Context(), q, queryStr, filter, statsGroupBy, limit, offset) + if err != nil { + s.logger.Error("fast search failed", "query", queryStr, "error", err) + writeError(w, http.StatusInternalServerError, "internal_error", "Search failed") + return + } + + summaries := make([]MessageSummary, len(result.Messages)) + for i, m := range result.Messages { + summaries[i] = toMessageSummaryFromQuery(m) + } + + writeJSON(w, http.StatusOK, SearchFastResponse{ + Query: queryStr, + Messages: summaries, + TotalCount: result.TotalCount, + Stats: toTotalStatsResponse(result.Stats), + }) +} + +// handleDeepSearch performs full-text body search via FTS5. +// GET /api/v1/search/deep?q=invoice&offset=0&limit=100 +func (s *Server) handleDeepSearch(w http.ResponseWriter, r *http.Request) { + if s.engine == nil { + writeError(w, http.StatusServiceUnavailable, "engine_unavailable", "Query engine not available") + return + } + + queryStr := r.URL.Query().Get("q") + if queryStr == "" { + writeError(w, http.StatusBadRequest, "missing_query", "Query parameter 'q' is required") + return + } + + offset, _ := strconv.Atoi(r.URL.Query().Get("offset")) + if offset < 0 { + offset = 0 + } + limit, _ := strconv.Atoi(r.URL.Query().Get("limit")) + if limit <= 0 || limit > 500 { + limit = 100 + } + + q := search.Parse(queryStr) + + messages, err := s.engine.Search(r.Context(), q, limit, offset) + if err != nil { + s.logger.Error("deep search failed", "query", queryStr, "error", err) + writeError(w, http.StatusInternalServerError, "internal_error", "Search failed") + return + } + + summaries := make([]MessageSummary, len(messages)) + for i, m := range messages { + summaries[i] = toMessageSummaryFromQuery(m) + } + + // For deep search, we don't have a fast count, so use -1 to indicate unknown + totalCount := int64(len(summaries)) + if len(summaries) == limit { + totalCount = -1 // Indicates more results may exist + } + + writeJSON(w, http.StatusOK, map[string]interface{}{ + "query": queryStr, + "messages": summaries, + "total_count": totalCount, + "offset": offset, + "limit": limit, + }) +} diff --git a/internal/api/handlers_test.go b/internal/api/handlers_test.go index a7331e29..3acc087c 100644 --- a/internal/api/handlers_test.go +++ b/internal/api/handlers_test.go @@ -12,6 +12,8 @@ import ( "time" "github.com/wesm/msgvault/internal/config" + "github.com/wesm/msgvault/internal/query" + "github.com/wesm/msgvault/internal/query/querytest" ) func newTestServerWithMockStore(t *testing.T) (*Server, *mockStore) { @@ -734,3 +736,347 @@ func TestSanitizeTokenPath(t *testing.T) { }) } } + +// newTestServerWithEngine creates a test server with both mock store and mock engine. +func newTestServerWithEngine(t *testing.T, engine *querytest.MockEngine) *Server { + t.Helper() + + store := &mockStore{ + stats: &StoreStats{ + MessageCount: 10, + ThreadCount: 5, + SourceCount: 1, + LabelCount: 3, + AttachmentCount: 2, + DatabaseSize: 1024, + }, + messages: []APIMessage{ + { + ID: 1, + Subject: "Test Subject", + From: "sender@example.com", + To: []string{"recipient@example.com"}, + SentAt: time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC), + Snippet: "This is a test message snippet", + Labels: []string{"INBOX"}, + HasAttachments: false, + SizeEstimate: 1024, + Body: "This is the full message body text.", + }, + }, + total: 1, + } + + cfg := &config.Config{ + Server: config.ServerConfig{APIPort: 8080}, + } + sched := newMockScheduler() + + srv := NewServerWithOptions(ServerOptions{ + Config: cfg, + Store: store, + Engine: engine, + Scheduler: sched, + Logger: testLogger(), + }) + return srv +} + +func TestHandleAggregates(t *testing.T) { + engine := &querytest.MockEngine{ + AggregateRows: []query.AggregateRow{ + {Key: "alice@example.com", Count: 100, TotalSize: 50000, AttachmentSize: 10000, AttachmentCount: 5}, + {Key: "bob@example.com", Count: 50, TotalSize: 25000, AttachmentSize: 5000, AttachmentCount: 2}, + }, + } + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/aggregates?view_type=senders", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("status = %d, want %d, body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp AggregateResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + if resp.ViewType != "senders" { + t.Errorf("view_type = %q, want 'senders'", resp.ViewType) + } + if len(resp.Rows) != 2 { + t.Errorf("rows count = %d, want 2", len(resp.Rows)) + } + if resp.Rows[0].Key != "alice@example.com" { + t.Errorf("first row key = %q, want 'alice@example.com'", resp.Rows[0].Key) + } +} + +func TestHandleAggregatesNoEngine(t *testing.T) { + // Server without engine + cfg := &config.Config{ + Server: config.ServerConfig{APIPort: 8080}, + } + sched := newMockScheduler() + srv := NewServerWithOptions(ServerOptions{ + Config: cfg, + Store: nil, + Engine: nil, + Scheduler: sched, + Logger: testLogger(), + }) + + req := httptest.NewRequest("GET", "/api/v1/aggregates?view_type=senders", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusServiceUnavailable { + t.Errorf("status = %d, want %d", w.Code, http.StatusServiceUnavailable) + } +} + +func TestHandleAggregatesInvalidViewType(t *testing.T) { + engine := &querytest.MockEngine{} + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/aggregates?view_type=invalid", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("status = %d, want %d", w.Code, http.StatusBadRequest) + } +} + +func TestHandleSubAggregates(t *testing.T) { + engine := &querytest.MockEngine{ + AggregateRows: []query.AggregateRow{ + {Key: "INBOX", Count: 80, TotalSize: 40000}, + {Key: "SENT", Count: 20, TotalSize: 10000}, + }, + } + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/aggregates/sub?view_type=labels&sender=alice@example.com", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("status = %d, want %d, body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp AggregateResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + if resp.ViewType != "labels" { + t.Errorf("view_type = %q, want 'labels'", resp.ViewType) + } + if len(resp.Rows) != 2 { + t.Errorf("rows count = %d, want 2", len(resp.Rows)) + } +} + +func TestHandleFilteredMessages(t *testing.T) { + engine := &querytest.MockEngine{ + ListResults: []query.MessageSummary{ + { + ID: 1, + Subject: "Test Email", + FromEmail: "alice@example.com", + SentAt: time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC), + Labels: []string{"INBOX"}, + HasAttachments: false, + SizeEstimate: 1024, + }, + }, + } + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/messages/filter?sender=alice@example.com&limit=100", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("status = %d, want %d, body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + messages, ok := resp["messages"].([]interface{}) + if !ok { + t.Fatal("expected messages array in response") + } + if len(messages) != 1 { + t.Errorf("messages count = %d, want 1", len(messages)) + } +} + +func TestHandleTotalStats(t *testing.T) { + engine := &querytest.MockEngine{ + Stats: &query.TotalStats{ + MessageCount: 1000, + TotalSize: 5000000, + AttachmentCount: 100, + AttachmentSize: 1000000, + LabelCount: 10, + AccountCount: 2, + }, + } + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/stats/total", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("status = %d, want %d, body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp TotalStatsResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + if resp.MessageCount != 1000 { + t.Errorf("message_count = %d, want 1000", resp.MessageCount) + } + if resp.TotalSize != 5000000 { + t.Errorf("total_size = %d, want 5000000", resp.TotalSize) + } +} + +func TestHandleFastSearch(t *testing.T) { + engine := &querytest.MockEngine{ + SearchFastResults: []query.MessageSummary{ + { + ID: 1, + Subject: "Invoice 12345", + FromEmail: "billing@example.com", + SentAt: time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC), + }, + }, + Stats: &query.TotalStats{ + MessageCount: 1, + TotalSize: 1024, + }, + } + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/search/fast?q=invoice", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("status = %d, want %d, body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp SearchFastResponse + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + if resp.Query != "invoice" { + t.Errorf("query = %q, want 'invoice'", resp.Query) + } + if len(resp.Messages) != 1 { + t.Errorf("messages count = %d, want 1", len(resp.Messages)) + } +} + +func TestHandleFastSearchMissingQuery(t *testing.T) { + engine := &querytest.MockEngine{} + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/search/fast", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("status = %d, want %d", w.Code, http.StatusBadRequest) + } +} + +func TestHandleFastSearchInvalidViewType(t *testing.T) { + engine := &querytest.MockEngine{} + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/search/fast?q=test&view_type=invalid", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("status = %d, want %d", w.Code, http.StatusBadRequest) + } + + var errResp map[string]string + if err := json.NewDecoder(w.Body).Decode(&errResp); err != nil { + t.Fatalf("failed to decode error response: %v", err) + } + + if errResp["error"] != "invalid_view_type" { + t.Errorf("error = %q, want 'invalid_view_type'", errResp["error"]) + } +} + +func TestHandleDeepSearch(t *testing.T) { + engine := &querytest.MockEngine{ + SearchResults: []query.MessageSummary{ + { + ID: 1, + Subject: "Meeting Notes", + FromEmail: "team@example.com", + SentAt: time.Date(2024, 1, 15, 10, 30, 0, 0, time.UTC), + }, + }, + } + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/search/deep?q=agenda", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusOK { + t.Errorf("status = %d, want %d, body: %s", w.Code, http.StatusOK, w.Body.String()) + } + + var resp map[string]interface{} + if err := json.NewDecoder(w.Body).Decode(&resp); err != nil { + t.Fatalf("failed to decode response: %v", err) + } + + if resp["query"] != "agenda" { + t.Errorf("query = %v, want 'agenda'", resp["query"]) + } +} + +func TestHandleDeepSearchMissingQuery(t *testing.T) { + engine := &querytest.MockEngine{} + srv := newTestServerWithEngine(t, engine) + + req := httptest.NewRequest("GET", "/api/v1/search/deep", nil) + w := httptest.NewRecorder() + + srv.Router().ServeHTTP(w, req) + + if w.Code != http.StatusBadRequest { + t.Errorf("status = %d, want %d", w.Code, http.StatusBadRequest) + } +} diff --git a/internal/api/server.go b/internal/api/server.go index 00e4149b..eac905c5 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -14,6 +14,7 @@ import ( "github.com/go-chi/chi/v5" chimw "github.com/go-chi/chi/v5/middleware" "github.com/wesm/msgvault/internal/config" + "github.com/wesm/msgvault/internal/query" "github.com/wesm/msgvault/internal/scheduler" "github.com/wesm/msgvault/internal/store" ) @@ -45,6 +46,7 @@ type AccountStatus = scheduler.AccountStatus type Server struct { cfg *config.Config store MessageStore + engine query.Engine // Query engine for aggregates and TUI support scheduler SyncScheduler logger *slog.Logger router chi.Router @@ -53,13 +55,33 @@ type Server struct { cfgMu sync.RWMutex // protects cfg.Accounts } +// ServerOptions configures the API server. +type ServerOptions struct { + Config *config.Config + Store MessageStore + Engine query.Engine // Optional: query engine for aggregates and TUI support + Scheduler SyncScheduler + Logger *slog.Logger +} + // NewServer creates a new API server. func NewServer(cfg *config.Config, store MessageStore, sched SyncScheduler, logger *slog.Logger) *Server { + return NewServerWithOptions(ServerOptions{ + Config: cfg, + Store: store, + Scheduler: sched, + Logger: logger, + }) +} + +// NewServerWithOptions creates a new API server with full options including query engine. +func NewServerWithOptions(opts ServerOptions) *Server { s := &Server{ - cfg: cfg, - store: store, - scheduler: sched, - logger: logger, + cfg: opts.Config, + store: opts.Store, + engine: opts.Engine, + scheduler: opts.Scheduler, + logger: opts.Logger, } s.router = s.setupRouter() return s @@ -111,6 +133,14 @@ func (s *Server) setupRouter() chi.Router { // Search r.Get("/search", s.handleSearch) + // TUI aggregate endpoints (require query engine) + r.Get("/aggregates", s.handleAggregates) + r.Get("/aggregates/sub", s.handleSubAggregates) + r.Get("/messages/filter", s.handleFilteredMessages) + r.Get("/stats/total", s.handleTotalStats) + r.Get("/search/fast", s.handleFastSearch) + r.Get("/search/deep", s.handleDeepSearch) + // Accounts and sync r.Get("/accounts", s.handleListAccounts) r.Post("/accounts", s.handleAddAccount) diff --git a/internal/query/DESIGN.md b/internal/query/DESIGN.md index 86cd44f5..9af363a7 100644 --- a/internal/query/DESIGN.md +++ b/internal/query/DESIGN.md @@ -12,21 +12,29 @@ It supports two use cases: The package defines an `Engine` interface that can be implemented by: -### SQLiteEngine (Current) +### SQLiteEngine - Uses direct SQLite queries with JOINs - Flexible: supports all filters and sorting options - Performance: adequate for small-medium databases (<100k messages) - Always available as fallback -### ParquetEngine (Future) +### DuckDBEngine (Preferred) -- Uses Arrow/Parquet files for denormalized analytics data +- Uses DuckDB to query Parquet files for denormalized analytics - Much faster for aggregates (~3000x vs SQLite JOINs) -- Read-only, requires periodic rebuild from SQLite +- Automatically falls back to SQLite for message detail queries - Best for large databases (100k+ messages) +- Built using `msgvault build-cache` command -## Parquet Schema (Planned) +### RemoteEngine + +- Implements Engine interface over HTTP API +- Used by TUI when `[remote].url` is configured +- Connects to a remote msgvault daemon (`msgvault serve`) +- Disables deletion and attachment export for safety + +## Parquet Schema ``` messages.parquet (partitioned by year): @@ -45,39 +53,38 @@ messages.parquet (partitioned by year): This denormalized schema avoids JOINs, enabling fast scans. -## Hybrid Approach +## DuckDB Hybrid Approach -For production use with large databases: +The DuckDBEngine handles this automatically: ```go -// Create hybrid engine that uses Parquet for aggregates, SQLite for details -engine := query.NewHybridEngine( - query.NewParquetEngine(parquetDir), - query.NewSQLiteEngine(db), -) +// Create DuckDB engine with SQLite fallback for message details +engine, err := query.NewDuckDBEngine(analyticsDir, dbPath, sqliteDB, opts) ``` -The hybrid engine routes queries to the appropriate backend: -- Aggregate queries → Parquet (fast scans) +The engine routes queries to the appropriate backend: +- Aggregate queries → DuckDB over Parquet (fast scans) - Message detail queries → SQLite (has body, raw MIME) +- Full-text search → SQLite FTS5 virtual table ## Build Process ```bash # Build/rebuild Parquet files from SQLite -msgvault build-analytics [--full-rebuild] +msgvault build-cache [--full-rebuild] # Files stored in ~/.msgvault/analytics/ # - messages/year=2024/*.parquet +# - participants/ +# - message_recipients/ +# - labels/ +# - message_labels/ +# - attachments/ # - _last_sync.json (incremental state) ``` ## Go Libraries -Potential libraries for Parquet support: -- `github.com/apache/arrow/go` - Official Arrow implementation -- `github.com/xitongsys/parquet-go` - Pure Go Parquet -- `github.com/marcboeker/go-duckdb` - DuckDB via CGO (SQL interface) - -DuckDB is attractive because it provides a SQL interface over Parquet, -similar to the Python implementation which uses DuckDB for ETL. +The implementation uses: +- `github.com/marcboeker/go-duckdb` - DuckDB via CGO (SQL interface over Parquet) +- SQLite FTS5 for full-text search (body content not in Parquet) diff --git a/internal/remote/engine.go b/internal/remote/engine.go new file mode 100644 index 00000000..42520a13 --- /dev/null +++ b/internal/remote/engine.go @@ -0,0 +1,698 @@ +// Package remote provides HTTP client implementations for accessing a remote msgvault server. +package remote + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + + "github.com/wesm/msgvault/internal/query" + "github.com/wesm/msgvault/internal/search" +) + +// ErrNotSupported is returned for operations not available in remote mode. +var ErrNotSupported = errors.New("operation not supported in remote mode") + +// Engine implements query.Engine by making HTTP calls to a remote msgvault server. +type Engine struct { + store *Store +} + +// Compile-time check that Engine implements query.Engine. +var _ query.Engine = (*Engine)(nil) + +// NewEngine creates a new remote query engine. +func NewEngine(cfg Config) (*Engine, error) { + s, err := New(cfg) + if err != nil { + return nil, err + } + return &Engine{store: s}, nil +} + +// NewEngineFromStore creates a new remote query engine from an existing store. +func NewEngineFromStore(s *Store) *Engine { + return &Engine{store: s} +} + +// IsRemote returns true, indicating this is a remote engine. +func (e *Engine) IsRemote() bool { + return true +} + +// Close releases resources held by the engine. +func (e *Engine) Close() error { + return e.store.Close() +} + +// ============================================================================ +// API Response Types +// ============================================================================ + +// aggregateResponse matches the API aggregate response format. +type aggregateResponse struct { + ViewType string `json:"view_type"` + Rows []aggregateRowJSON `json:"rows"` +} + +// aggregateRowJSON represents a single aggregate row in JSON format. +type aggregateRowJSON struct { + Key string `json:"key"` + Count int64 `json:"count"` + TotalSize int64 `json:"total_size"` + AttachmentSize int64 `json:"attachment_size"` + AttachmentCount int64 `json:"attachment_count"` + TotalUnique int64 `json:"total_unique"` +} + +// totalStatsResponse matches the API total stats response format. +type totalStatsResponse struct { + MessageCount int64 `json:"message_count"` + TotalSize int64 `json:"total_size"` + AttachmentCount int64 `json:"attachment_count"` + AttachmentSize int64 `json:"attachment_size"` + LabelCount int64 `json:"label_count"` + AccountCount int64 `json:"account_count"` +} + +// filteredMessagesResponse matches the API filtered messages response format. +type filteredMessagesResponse struct { + Total int `json:"total"` + Offset int `json:"offset"` + Limit int `json:"limit"` + Messages []messageSummaryJSON `json:"messages"` +} + +// messageSummaryJSON represents a message summary in JSON format. +type messageSummaryJSON struct { + ID int64 `json:"id"` + Subject string `json:"subject"` + From string `json:"from"` + To []string `json:"to"` + SentAt string `json:"sent_at"` + Snippet string `json:"snippet"` + Labels []string `json:"labels"` + HasAttach bool `json:"has_attachments"` + SizeBytes int64 `json:"size_bytes"` +} + +// searchFastResponse matches the API fast search response format. +type searchFastResponse struct { + Query string `json:"query"` + Messages []messageSummaryJSON `json:"messages"` + TotalCount int64 `json:"total_count"` + Stats *totalStatsResponse `json:"stats,omitempty"` +} + +// deepSearchResponse matches the API deep search response format. +type deepSearchResponse struct { + Query string `json:"query"` + Messages []messageSummaryJSON `json:"messages"` + TotalCount int64 `json:"total_count"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +// ============================================================================ +// Helper Functions +// ============================================================================ + +// viewTypeToString converts a query.ViewType to its API string representation. +func viewTypeToString(v query.ViewType) string { + switch v { + case query.ViewSenders: + return "senders" + case query.ViewSenderNames: + return "sender_names" + case query.ViewRecipients: + return "recipients" + case query.ViewRecipientNames: + return "recipient_names" + case query.ViewDomains: + return "domains" + case query.ViewLabels: + return "labels" + case query.ViewTime: + return "time" + default: + return "senders" + } +} + +// sortFieldToString converts a query.SortField to its API string representation. +func sortFieldToString(f query.SortField) string { + switch f { + case query.SortByCount: + return "count" + case query.SortBySize: + return "size" + case query.SortByAttachmentSize: + return "attachment_size" + case query.SortByName: + return "name" + default: + return "count" + } +} + +// sortDirectionToString converts a query.SortDirection to its API string representation. +func sortDirectionToString(d query.SortDirection) string { + if d == query.SortAsc { + return "asc" + } + return "desc" +} + +// timeGranularityToString converts a query.TimeGranularity to its API string representation. +func timeGranularityToString(g query.TimeGranularity) string { + switch g { + case query.TimeYear: + return "year" + case query.TimeMonth: + return "month" + case query.TimeDay: + return "day" + default: + return "month" + } +} + +// messageSortFieldToString converts a query.MessageSortField to its API string representation. +func messageSortFieldToString(f query.MessageSortField) string { + switch f { + case query.MessageSortByDate: + return "date" + case query.MessageSortBySize: + return "size" + case query.MessageSortBySubject: + return "subject" + default: + return "date" + } +} + +// buildAggregateQuery builds query parameters for aggregate endpoints. +func buildAggregateQuery(viewType query.ViewType, opts query.AggregateOptions) url.Values { + params := url.Values{} + params.Set("view_type", viewTypeToString(viewType)) + params.Set("sort", sortFieldToString(opts.SortField)) + params.Set("direction", sortDirectionToString(opts.SortDirection)) + + if opts.Limit > 0 { + params.Set("limit", strconv.Itoa(opts.Limit)) + } + params.Set("time_granularity", timeGranularityToString(opts.TimeGranularity)) + + if opts.SourceID != nil { + params.Set("source_id", strconv.FormatInt(*opts.SourceID, 10)) + } + if opts.WithAttachmentsOnly { + params.Set("attachments_only", "true") + } + if opts.HideDeletedFromSource { + params.Set("hide_deleted", "true") + } + if opts.SearchQuery != "" { + params.Set("search_query", opts.SearchQuery) + } + + return params +} + +// buildFilterQuery builds query parameters for filter endpoints. +func buildFilterQuery(filter query.MessageFilter) url.Values { + params := url.Values{} + + if filter.Sender != "" { + params.Set("sender", filter.Sender) + } + if filter.SenderName != "" { + params.Set("sender_name", filter.SenderName) + } + if filter.Recipient != "" { + params.Set("recipient", filter.Recipient) + } + if filter.RecipientName != "" { + params.Set("recipient_name", filter.RecipientName) + } + if filter.Domain != "" { + params.Set("domain", filter.Domain) + } + if filter.Label != "" { + params.Set("label", filter.Label) + } + if filter.TimeRange.Period != "" { + params.Set("time_period", filter.TimeRange.Period) + } + params.Set("time_granularity", timeGranularityToString(filter.TimeRange.Granularity)) + + if filter.ConversationID != nil { + params.Set("conversation_id", strconv.FormatInt(*filter.ConversationID, 10)) + } + if filter.SourceID != nil { + params.Set("source_id", strconv.FormatInt(*filter.SourceID, 10)) + } + if filter.WithAttachmentsOnly { + params.Set("attachments_only", "true") + } + if filter.HideDeletedFromSource { + params.Set("hide_deleted", "true") + } + + // Pagination + if filter.Pagination.Offset > 0 { + params.Set("offset", strconv.Itoa(filter.Pagination.Offset)) + } + if filter.Pagination.Limit > 0 { + params.Set("limit", strconv.Itoa(filter.Pagination.Limit)) + } + + // Sorting + params.Set("sort", messageSortFieldToString(filter.Sorting.Field)) + params.Set("direction", sortDirectionToString(filter.Sorting.Direction)) + + return params +} + +// buildStatsQuery builds query parameters for stats endpoints. +func buildStatsQuery(opts query.StatsOptions) url.Values { + params := url.Values{} + + if opts.SourceID != nil { + params.Set("source_id", strconv.FormatInt(*opts.SourceID, 10)) + } + if opts.WithAttachmentsOnly { + params.Set("attachments_only", "true") + } + if opts.HideDeletedFromSource { + params.Set("hide_deleted", "true") + } + if opts.SearchQuery != "" { + params.Set("search_query", opts.SearchQuery) + } + if opts.GroupBy != 0 { + params.Set("group_by", viewTypeToString(opts.GroupBy)) + } + + return params +} + +// parseAggregateResponse parses the JSON response body into aggregate rows. +func parseAggregateResponse(body []byte) ([]query.AggregateRow, error) { + var resp aggregateResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("decode aggregate response: %w", err) + } + + rows := make([]query.AggregateRow, len(resp.Rows)) + for i, r := range resp.Rows { + rows[i] = query.AggregateRow{ + Key: r.Key, + Count: r.Count, + TotalSize: r.TotalSize, + AttachmentSize: r.AttachmentSize, + AttachmentCount: r.AttachmentCount, + TotalUnique: r.TotalUnique, + } + } + return rows, nil +} + +// parseMessageSummaries converts JSON message summaries to query.MessageSummary. +func parseMessageSummaries(msgs []messageSummaryJSON) []query.MessageSummary { + result := make([]query.MessageSummary, len(msgs)) + for i, m := range msgs { + sentAt := parseTime(m.SentAt) + result[i] = query.MessageSummary{ + ID: m.ID, + Subject: m.Subject, + FromEmail: m.From, + SentAt: sentAt, + Snippet: m.Snippet, + Labels: m.Labels, + HasAttachments: m.HasAttach, + SizeEstimate: m.SizeBytes, + } + } + return result +} + +// ============================================================================ +// Engine Interface Implementation +// ============================================================================ + +// Aggregate performs grouping based on the provided ViewType. +func (e *Engine) Aggregate(ctx context.Context, groupBy query.ViewType, opts query.AggregateOptions) ([]query.AggregateRow, error) { + params := buildAggregateQuery(groupBy, opts) + path := "/api/v1/aggregates?" + params.Encode() + + resp, err := e.store.doRequestWithContext(ctx, "GET", path, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, handleErrorResponse(resp) + } + + var body []byte + body, err = readBody(resp) + if err != nil { + return nil, err + } + + return parseAggregateResponse(body) +} + +// SubAggregate performs aggregation on a filtered subset of messages. +func (e *Engine) SubAggregate(ctx context.Context, filter query.MessageFilter, groupBy query.ViewType, opts query.AggregateOptions) ([]query.AggregateRow, error) { + // Merge filter params with aggregate options + params := buildFilterQuery(filter) + params.Set("view_type", viewTypeToString(groupBy)) + params.Set("sort", sortFieldToString(opts.SortField)) + params.Set("direction", sortDirectionToString(opts.SortDirection)) + if opts.Limit > 0 { + params.Set("limit", strconv.Itoa(opts.Limit)) + } + // Use TimeGranularity from opts, not from filter (fixes roborev finding) + params.Set("time_granularity", timeGranularityToString(opts.TimeGranularity)) + if opts.SearchQuery != "" { + params.Set("search_query", opts.SearchQuery) + } + + path := "/api/v1/aggregates/sub?" + params.Encode() + + resp, err := e.store.doRequestWithContext(ctx, "GET", path, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, handleErrorResponse(resp) + } + + body, err := readBody(resp) + if err != nil { + return nil, err + } + + return parseAggregateResponse(body) +} + +// ListMessages returns messages matching the filter criteria. +func (e *Engine) ListMessages(ctx context.Context, filter query.MessageFilter) ([]query.MessageSummary, error) { + params := buildFilterQuery(filter) + path := "/api/v1/messages/filter?" + params.Encode() + + resp, err := e.store.doRequestWithContext(ctx, "GET", path, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, handleErrorResponse(resp) + } + + var fmr filteredMessagesResponse + if err := json.NewDecoder(resp.Body).Decode(&fmr); err != nil { + return nil, fmt.Errorf("decode messages response: %w", err) + } + + return parseMessageSummaries(fmr.Messages), nil +} + +// GetMessage returns a single message by ID. +func (e *Engine) GetMessage(ctx context.Context, id int64) (*query.MessageDetail, error) { + msg, err := e.store.GetMessage(id) + if err != nil { + return nil, err + } + if msg == nil { + return nil, nil + } + + // Convert store.APIMessage to query.MessageDetail + detail := &query.MessageDetail{ + ID: msg.ID, + Subject: msg.Subject, + Snippet: msg.Snippet, + SentAt: msg.SentAt, + SizeEstimate: msg.SizeEstimate, + Labels: msg.Labels, + BodyText: msg.Body, // API returns combined body + } + + // Parse From address + if msg.From != "" { + detail.From = []query.Address{{Email: msg.From}} + } + + // Parse To addresses + for _, to := range msg.To { + detail.To = append(detail.To, query.Address{Email: to}) + } + + // Convert attachments + for _, att := range msg.Attachments { + detail.Attachments = append(detail.Attachments, query.AttachmentInfo{ + Filename: att.Filename, + MimeType: att.MimeType, + Size: att.Size, + }) + } + + detail.HasAttachments = len(detail.Attachments) > 0 + + return detail, nil +} + +// GetMessageBySourceID returns a message by its source message ID. +// This operation is not supported in remote mode. +func (e *Engine) GetMessageBySourceID(ctx context.Context, sourceMessageID string) (*query.MessageDetail, error) { + return nil, ErrNotSupported +} + +// GetAttachment returns attachment metadata by ID. +// This operation is not supported in remote mode. +func (e *Engine) GetAttachment(ctx context.Context, id int64) (*query.AttachmentInfo, error) { + return nil, ErrNotSupported +} + +// Search performs full-text search including message body. +func (e *Engine) Search(ctx context.Context, q *search.Query, limit, offset int) ([]query.MessageSummary, error) { + // Build query string from search.Query + queryStr := buildSearchQueryString(q) + if queryStr == "" { + return nil, nil + } + + params := url.Values{} + params.Set("q", queryStr) + params.Set("offset", strconv.Itoa(offset)) + params.Set("limit", strconv.Itoa(limit)) + + path := "/api/v1/search/deep?" + params.Encode() + + resp, err := e.store.doRequestWithContext(ctx, "GET", path, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, handleErrorResponse(resp) + } + + var dsr deepSearchResponse + if err := json.NewDecoder(resp.Body).Decode(&dsr); err != nil { + return nil, fmt.Errorf("decode search response: %w", err) + } + + return parseMessageSummaries(dsr.Messages), nil +} + +// SearchFast searches message metadata only (no body text). +func (e *Engine) SearchFast(ctx context.Context, q *search.Query, filter query.MessageFilter, limit, offset int) ([]query.MessageSummary, error) { + result, err := e.SearchFastWithStats(ctx, q, buildSearchQueryString(q), filter, query.ViewSenders, limit, offset) + if err != nil { + return nil, err + } + return result.Messages, nil +} + +// SearchFastCount returns the total count of messages matching a search query. +func (e *Engine) SearchFastCount(ctx context.Context, q *search.Query, filter query.MessageFilter) (int64, error) { + // Use SearchFastWithStats with limit 0 to get count only + result, err := e.SearchFastWithStats(ctx, q, buildSearchQueryString(q), filter, query.ViewSenders, 0, 0) + if err != nil { + return 0, err + } + return result.TotalCount, nil +} + +// SearchFastWithStats performs a fast metadata search and returns paginated results, +// total count, and aggregate stats in a single operation. +func (e *Engine) SearchFastWithStats(ctx context.Context, q *search.Query, queryStr string, + filter query.MessageFilter, statsGroupBy query.ViewType, limit, offset int) (*query.SearchFastResult, error) { + + params := buildFilterQuery(filter) + params.Set("q", queryStr) + params.Set("offset", strconv.Itoa(offset)) + params.Set("limit", strconv.Itoa(limit)) + params.Set("view_type", viewTypeToString(statsGroupBy)) + + path := "/api/v1/search/fast?" + params.Encode() + + resp, err := e.store.doRequestWithContext(ctx, "GET", path, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, handleErrorResponse(resp) + } + + var sfr searchFastResponse + if err := json.NewDecoder(resp.Body).Decode(&sfr); err != nil { + return nil, fmt.Errorf("decode search response: %w", err) + } + + result := &query.SearchFastResult{ + Messages: parseMessageSummaries(sfr.Messages), + TotalCount: sfr.TotalCount, + } + + if sfr.Stats != nil { + result.Stats = &query.TotalStats{ + MessageCount: sfr.Stats.MessageCount, + TotalSize: sfr.Stats.TotalSize, + AttachmentCount: sfr.Stats.AttachmentCount, + AttachmentSize: sfr.Stats.AttachmentSize, + LabelCount: sfr.Stats.LabelCount, + AccountCount: sfr.Stats.AccountCount, + } + } + + return result, nil +} + +// GetGmailIDsByFilter returns Gmail message IDs matching a filter. +// This operation is not supported in remote mode. +func (e *Engine) GetGmailIDsByFilter(ctx context.Context, filter query.MessageFilter) ([]string, error) { + return nil, ErrNotSupported +} + +// ListAccounts returns all configured accounts. +func (e *Engine) ListAccounts(ctx context.Context) ([]query.AccountInfo, error) { + accounts, err := e.store.ListAccounts() + if err != nil { + return nil, err + } + + result := make([]query.AccountInfo, len(accounts)) + for i, acc := range accounts { + result[i] = query.AccountInfo{ + Identifier: acc.Email, + DisplayName: acc.DisplayName, + } + } + return result, nil +} + +// GetTotalStats returns overall database statistics. +func (e *Engine) GetTotalStats(ctx context.Context, opts query.StatsOptions) (*query.TotalStats, error) { + params := buildStatsQuery(opts) + path := "/api/v1/stats/total?" + params.Encode() + + resp, err := e.store.doRequestWithContext(ctx, "GET", path, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return nil, handleErrorResponse(resp) + } + + var tsr totalStatsResponse + if err := json.NewDecoder(resp.Body).Decode(&tsr); err != nil { + return nil, fmt.Errorf("decode stats response: %w", err) + } + + return &query.TotalStats{ + MessageCount: tsr.MessageCount, + TotalSize: tsr.TotalSize, + AttachmentCount: tsr.AttachmentCount, + AttachmentSize: tsr.AttachmentSize, + LabelCount: tsr.LabelCount, + AccountCount: tsr.AccountCount, + }, nil +} + +// buildSearchQueryString reconstructs a search query string from a parsed Query. +// This is needed because the API expects the raw query string. +func buildSearchQueryString(q *search.Query) string { + if q == nil { + return "" + } + + var parts []string + + parts = append(parts, q.TextTerms...) + for _, addr := range q.FromAddrs { + parts = append(parts, "from:"+addr) + } + for _, addr := range q.ToAddrs { + parts = append(parts, "to:"+addr) + } + for _, addr := range q.CcAddrs { + parts = append(parts, "cc:"+addr) + } + for _, addr := range q.BccAddrs { + parts = append(parts, "bcc:"+addr) + } + for _, term := range q.SubjectTerms { + parts = append(parts, "subject:"+term) + } + for _, label := range q.Labels { + parts = append(parts, "label:"+label) + } + if q.HasAttachment != nil && *q.HasAttachment { + parts = append(parts, "has:attachment") + } + if q.BeforeDate != nil { + parts = append(parts, "before:"+q.BeforeDate.Format("2006-01-02")) + } + if q.AfterDate != nil { + parts = append(parts, "after:"+q.AfterDate.Format("2006-01-02")) + } + if q.LargerThan != nil { + parts = append(parts, fmt.Sprintf("larger:%d", *q.LargerThan)) + } + if q.SmallerThan != nil { + parts = append(parts, fmt.Sprintf("smaller:%d", *q.SmallerThan)) + } + + result := "" + for i, part := range parts { + if i > 0 { + result += " " + } + result += part + } + return result +} + +// readBody reads the response body into a byte slice. +func readBody(resp *http.Response) ([]byte, error) { + return io.ReadAll(resp.Body) +} diff --git a/internal/remote/store.go b/internal/remote/store.go index 0eeb4d13..1e5a9bb3 100644 --- a/internal/remote/store.go +++ b/internal/remote/store.go @@ -2,6 +2,7 @@ package remote import ( + "context" "encoding/json" "fmt" "io" @@ -77,9 +78,14 @@ func (s *Store) Close() error { // doRequest performs an authenticated HTTP request. func (s *Store) doRequest(method, path string, body io.Reader) (*http.Response, error) { + return s.doRequestWithContext(context.Background(), method, path, body) +} + +// doRequestWithContext performs an authenticated HTTP request with context support. +func (s *Store) doRequestWithContext(ctx context.Context, method, path string, body io.Reader) (*http.Response, error) { reqURL := s.baseURL + path - req, err := http.NewRequest(method, reqURL, body) + req, err := http.NewRequestWithContext(ctx, method, reqURL, body) if err != nil { return nil, fmt.Errorf("create request: %w", err) } diff --git a/internal/tui/keys.go b/internal/tui/keys.go index 015f940f..92a1d1e7 100644 --- a/internal/tui/keys.go +++ b/internal/tui/keys.go @@ -194,6 +194,9 @@ func (m Model) handleAggregateKeys(msg tea.KeyMsg) (tea.Model, tea.Cmd) { return m, m.loadMessages() case "d", "D": // Stage for deletion (selection or current row) + if m.isRemote { + return m.showFlash("Deletion not available in remote mode") + } if !m.hasSelection() && len(m.rows) > 0 && m.cursor < len(m.rows) { // No selection - select current row first m.selection.aggregateKeys[m.rows[m.cursor].Key] = true @@ -370,6 +373,9 @@ func (m Model) handleMessageListKeys(msg tea.KeyMsg) (tea.Model, tea.Cmd) { m.clearAllSelections() case "d", "D": // Stage for deletion (selection or current row) + if m.isRemote { + return m.showFlash("Deletion not available in remote mode") + } if !m.hasSelection() && len(m.messages) > 0 && m.cursor < len(m.messages) { // No selection - select current row first m.selection.messageIDs[m.messages[m.cursor].ID] = true @@ -780,6 +786,9 @@ func (m Model) handleMessageDetailKeys(msg tea.KeyMsg) (tea.Model, tea.Cmd) { // Export attachments case "e": + if m.isRemote { + return m.showFlash("Export not available in remote mode") + } if m.messageDetail != nil && len(m.messageDetail.Attachments) > 0 { m.modal = modalExportAttachments m.modalCursor = 0 diff --git a/internal/tui/model.go b/internal/tui/model.go index 078d18b3..5031e0a2 100644 --- a/internal/tui/model.go +++ b/internal/tui/model.go @@ -50,6 +50,10 @@ type Options struct { // ThreadMessageLimit overrides the maximum number of messages in a thread view. // Zero uses the default (1,000). ThreadMessageLimit int + + // IsRemote indicates the TUI is connected to a remote server. + // Some features (deletion staging, attachment export) are disabled in remote mode. + IsRemote bool } // modalType represents the type of modal dialog. @@ -107,6 +111,9 @@ type Model struct { aggregateLimit int threadMessageLimit int + // Remote mode (disables deletion/export) + isRemote bool + // Navigation breadcrumbs []navigationSnapshot @@ -216,6 +223,7 @@ func New(engine query.Engine, opts Options) Model { version: opts.Version, aggregateLimit: aggLimit, threadMessageLimit: threadLimit, + isRemote: opts.IsRemote, viewState: viewState{ level: levelAggregates, viewType: query.ViewSenders,