From 1f9647ed26c4f9d265bcae47aa44c87cb4d8e01a Mon Sep 17 00:00:00 2001 From: drondeseries Date: Fri, 17 Apr 2026 13:36:34 -0400 Subject: [PATCH] feat(health): implement GUID-based repair and harvesting system --- internal/api/sabnzbd_types.go | 7 + internal/arrs/scanner/manager.go | 179 ++++++++++++++++++++- internal/arrs/service.go | 7 +- internal/arrs/worker/worker.go | 212 +++++++++++++++++++++++-- internal/config/manager.go | 5 + internal/database/health_repository.go | 29 ++++ internal/database/repository.go | 50 ++++++ internal/health/repair_e2e_test.go | 6 + internal/health/worker.go | 88 +++++++--- internal/importer/processor.go | 20 ++- internal/importer/service.go | 4 +- 11 files changed, 560 insertions(+), 47 deletions(-) diff --git a/internal/api/sabnzbd_types.go b/internal/api/sabnzbd_types.go index 8ff9548b..fdd03c7b 100644 --- a/internal/api/sabnzbd_types.go +++ b/internal/api/sabnzbd_types.go @@ -60,6 +60,7 @@ type SABnzbdQueueSlot struct { Sizeleft string `json:"sizeleft"` Mb string `json:"mb"` Mbleft string `json:"mbleft"` + Message string `json:"message"` // Added for detailed status in ARR apps } // SABnzbdHistoryResponse represents the history response structure @@ -385,6 +386,11 @@ func ToSABnzbdQueueSlot(item *database.ImportQueueItem, index int, progressBroad nzoID = *item.DownloadID } + message := "" + if item.ErrorMessage != nil { + message = *item.ErrorMessage + } + return SABnzbdQueueSlot{ Index: index, NzoID: nzoID, @@ -400,6 +406,7 @@ func ToSABnzbdQueueSlot(item *database.ImportQueueItem, index int, progressBroad Sizeleft: formatHumanSize(sizeLeftBytes), Mb: formatSizeMB(totalSizeBytes), Mbleft: formatSizeMB(sizeLeftBytes), + Message: message, } } diff --git a/internal/arrs/scanner/manager.go b/internal/arrs/scanner/manager.go index f7288dca..664d28cc 100644 --- a/internal/arrs/scanner/manager.go +++ b/internal/arrs/scanner/manager.go @@ -15,6 +15,7 @@ import ( "github.com/javi11/altmount/internal/arrs/instances" "github.com/javi11/altmount/internal/arrs/model" "github.com/javi11/altmount/internal/config" + "github.com/javi11/altmount/internal/database" "golift.io/starr" "golift.io/starr/lidarr" "golift.io/starr/radarr" @@ -27,15 +28,17 @@ type Manager struct { instances *instances.Manager clients *clients.Manager data *data.Manager + queueRepo *database.Repository sf singleflight.Group } -func NewManager(configGetter config.ConfigGetter, instances *instances.Manager, clients *clients.Manager, data *data.Manager) *Manager { +func NewManager(configGetter config.ConfigGetter, instances *instances.Manager, clients *clients.Manager, data *data.Manager, queueRepo *database.Repository) *Manager { return &Manager{ configGetter: configGetter, instances: instances, clients: clients, data: data, + queueRepo: queueRepo, } } @@ -786,6 +789,180 @@ func (m *Manager) triggerSonarrRescanByPath(ctx context.Context, client *sonarr. return nil } +// TriggerRepairByDownloadID attempts to mark a download as failed in ARR instances using its GUID +func (m *Manager) TriggerRepairByDownloadID(ctx context.Context, downloadID string, reason string) error { + if downloadID == "" { + return fmt.Errorf("downloadID is empty") + } + + res, err, _ := m.sf.Do(fmt.Sprintf("repair_id:%s", downloadID), func() (interface{}, error) { + slog.InfoContext(ctx, "Triggering ARR repair by download ID", "download_id", downloadID, "reason", reason) + + // Record the specific reason in the import queue so ARR apps see it via SABnzbd API + if m.queueRepo != nil { + enhancedReason := fmt.Sprintf("AltMount Health: %s", reason) + _ = m.queueRepo.UpdateQueueItemErrorMessageByDownloadID(ctx, downloadID, enhancedReason) + } + + allInstances := m.instances.GetAllInstances() + found := false + + for _, instance := range allInstances { + if !instance.Enabled { + continue + } + + var failErr error + switch instance.Type { + case "radarr", "whisparr": + client, err := m.clients.GetOrCreateRadarrClient(instance.Name, instance.URL, instance.APIKey) + if err == nil { + failErr = m.failRadarrQueueItemByDownloadID(ctx, client, downloadID) + } + case "sonarr": + client, err := m.clients.GetOrCreateSonarrClient(instance.Name, instance.URL, instance.APIKey) + if err == nil { + failErr = m.failSonarrQueueItemByDownloadID(ctx, client, downloadID) + } + case "lidarr": + client, err := m.clients.GetOrCreateLidarrClient(instance.Name, instance.URL, instance.APIKey) + if err == nil { + failErr = m.failLidarrQueueItemByDownloadID(ctx, client, downloadID) + } + case "readarr": + client, err := m.clients.GetOrCreateReadarrClient(instance.Name, instance.URL, instance.APIKey) + if err == nil { + failErr = m.failReadarrQueueItemByDownloadID(ctx, client, downloadID) + } + } + + if failErr == nil { + slog.InfoContext(ctx, "Successfully triggered repair by download ID in ARR instance", + "instance", instance.Name, "download_id", downloadID) + found = true + break + } else { + slog.DebugContext(ctx, "Download ID not found in ARR instance queue", + "instance", instance.Name, "download_id", downloadID, "error", failErr) + } + } + + if !found { + return nil, fmt.Errorf("download ID %s not found in any active ARR queue: %w", downloadID, model.ErrPathMatchFailed) + } + + return nil, nil + }) + + if err != nil { + return err + } + if res != nil { + return res.(error) + } + return nil +} + +// failRadarrQueueItemByDownloadID searches for an item in the active Radarr queue by download ID and marks it as failed +func (m *Manager) failRadarrQueueItemByDownloadID(ctx context.Context, client *radarr.Radarr, downloadID string) error { + queue, err := client.GetQueueContext(ctx, 0, 500) + if err != nil { + return fmt.Errorf("failed to get Radarr queue: %w", err) + } + + for _, q := range queue.Records { + if q.DownloadID == downloadID { + slog.InfoContext(ctx, "Found matching item in Radarr download queue by GUID, marking as failed", + "queue_id", q.ID, "download_id", downloadID) + + removeFromClient := true + opts := &starr.QueueDeleteOpts{ + RemoveFromClient: &removeFromClient, + BlockList: true, + SkipRedownload: false, + } + return client.DeleteQueueContext(ctx, q.ID, opts) + } + } + + return fmt.Errorf("no matching item found in Radarr queue for download ID: %s", downloadID) +} + +// failSonarrQueueItemByDownloadID searches for an item in the active Sonarr queue by download ID and marks it as failed +func (m *Manager) failSonarrQueueItemByDownloadID(ctx context.Context, client *sonarr.Sonarr, downloadID string) error { + queue, err := client.GetQueueContext(ctx, 0, 500) + if err != nil { + return fmt.Errorf("failed to get Sonarr queue: %w", err) + } + + for _, q := range queue.Records { + if q.DownloadID == downloadID { + slog.InfoContext(ctx, "Found matching item in Sonarr download queue by GUID, marking as failed", + "queue_id", q.ID, "download_id", downloadID) + + removeFromClient := true + opts := &starr.QueueDeleteOpts{ + RemoveFromClient: &removeFromClient, + BlockList: true, + SkipRedownload: false, + } + return client.DeleteQueueContext(ctx, q.ID, opts) + } + } + + return fmt.Errorf("no matching item found in Sonarr queue for download ID: %s", downloadID) +} + +// failLidarrQueueItemByDownloadID searches for an item in the active Lidarr queue by download ID and marks it as failed +func (m *Manager) failLidarrQueueItemByDownloadID(ctx context.Context, client *lidarr.Lidarr, downloadID string) error { + queue, err := client.GetQueueContext(ctx, 0, 500) + if err != nil { + return fmt.Errorf("failed to get Lidarr queue: %w", err) + } + + for _, q := range queue.Records { + if q.DownloadID == downloadID { + slog.InfoContext(ctx, "Found matching item in Lidarr download queue by GUID, marking as failed", + "queue_id", q.ID, "download_id", downloadID) + + removeFromClient := true + opts := &starr.QueueDeleteOpts{ + RemoveFromClient: &removeFromClient, + BlockList: true, + SkipRedownload: false, + } + return client.DeleteQueueContext(ctx, q.ID, opts) + } + } + + return fmt.Errorf("no matching item found in Lidarr queue for download ID: %s", downloadID) +} + +// failReadarrQueueItemByDownloadID searches for an item in the active Readarr queue by download ID and marks it as failed +func (m *Manager) failReadarrQueueItemByDownloadID(ctx context.Context, client *readarr.Readarr, downloadID string) error { + queue, err := client.GetQueueContext(ctx, 0, 500) + if err != nil { + return fmt.Errorf("failed to get Readarr queue: %w", err) + } + + for _, q := range queue.Records { + if q.DownloadID == downloadID { + slog.InfoContext(ctx, "Found matching item in Readarr download queue by GUID, marking as failed", + "queue_id", q.ID, "download_id", downloadID) + + removeFromClient := true + opts := &starr.QueueDeleteOpts{ + RemoveFromClient: &removeFromClient, + BlockList: true, + SkipRedownload: false, + } + return client.DeleteQueueContext(ctx, q.ID, opts) + } + } + + return fmt.Errorf("no matching item found in Readarr queue for download ID: %s", downloadID) +} + // failRadarrQueueItemByPath searches for an item in the active Radarr queue by path and marks it as failed func (m *Manager) failRadarrQueueItemByPath(ctx context.Context, client *radarr.Radarr, path string) error { queue, err := client.GetQueueContext(ctx, 0, 500) diff --git a/internal/arrs/service.go b/internal/arrs/service.go index a02c38e1..d257cc84 100644 --- a/internal/arrs/service.go +++ b/internal/arrs/service.go @@ -47,7 +47,7 @@ func NewService(configGetter config.ConfigGetter, configManager model.ConfigMana instManager := instances.NewManager(configGetter, configManager) clientManager := clients.NewManager() dataManager := data.NewManager() - scannerManager := scanner.NewManager(configGetter, instManager, clientManager, dataManager) + scannerManager := scanner.NewManager(configGetter, instManager, clientManager, dataManager, queueRepo) workerManager := worker.NewWorker(configGetter, instManager, clientManager, queueRepo) registrarManager := registrar.NewManager(instManager, clientManager) @@ -187,6 +187,11 @@ func (s *Service) TriggerDownloadScan(ctx context.Context, instanceType string) s.scanner.TriggerDownloadScan(ctx, instanceType) } +// TriggerRepairByDownloadID attempts to mark a download as failed in ARR instances using its GUID +func (s *Service) TriggerRepairByDownloadID(ctx context.Context, downloadID string, reason string) error { + return s.scanner.TriggerRepairByDownloadID(ctx, downloadID, reason) +} + // EnsureWebhookRegistration ensures that the AltMount webhook is registered in all enabled ARR instances func (s *Service) EnsureWebhookRegistration(ctx context.Context, altmountURL string, apiKey string) error { return s.registrar.EnsureWebhookRegistration(ctx, altmountURL, apiKey) diff --git a/internal/arrs/worker/worker.go b/internal/arrs/worker/worker.go index 2778a855..ca0551a1 100644 --- a/internal/arrs/worker/worker.go +++ b/internal/arrs/worker/worker.go @@ -35,6 +35,9 @@ type Worker struct { // key: instanceName|queueID firstSeen map[string]time.Time firstSeenMu sync.RWMutex + + // Harvester state + lastHarvest time.Time } func NewWorker(configGetter config.ConfigGetter, instances *instances.Manager, clients *clients.Manager, repo *database.Repository) *Worker { @@ -47,7 +50,7 @@ func NewWorker(configGetter config.ConfigGetter, instances *instances.Manager, c } } -// Start starts the queue cleanup worker +// Start starts the queue cleanup and harvesting workers func (w *Worker) Start(ctx context.Context) error { w.workerMu.Lock() defer w.workerMu.Unlock() @@ -60,29 +63,49 @@ func (w *Worker) Start(ctx context.Context) error { // ARRs must be enabled if cfg.Arrs.Enabled == nil || !*cfg.Arrs.Enabled { - slog.InfoContext(ctx, "ARR queue cleanup disabled (ARRs disabled)") - return nil - } - - // Queue cleanup is enabled by default (when nil or true) - if cfg.Arrs.QueueCleanupEnabled != nil && !*cfg.Arrs.QueueCleanupEnabled { - slog.InfoContext(ctx, "ARR queue cleanup disabled") + slog.InfoContext(ctx, "ARR workers disabled (ARRs disabled)") return nil } w.workerCtx, w.workerCancel = context.WithCancel(ctx) w.workerRunning = true - interval := time.Duration(cfg.Arrs.QueueCleanupIntervalSeconds) * time.Second - if interval <= 0 { - interval = 10 * time.Second + // 1. Start Queue Cleanup Worker + cleanupEnabled := true + if cfg.Arrs.QueueCleanupEnabled != nil { + cleanupEnabled = *cfg.Arrs.QueueCleanupEnabled } - w.workerWg.Add(1) - go w.runWorker(interval) + if cleanupEnabled { + interval := time.Duration(cfg.Arrs.QueueCleanupIntervalSeconds) * time.Second + if interval <= 0 { + interval = 10 * time.Second + } + + w.workerWg.Add(1) + go w.runWorker(interval) + slog.InfoContext(ctx, "ARR queue cleanup worker started", + "interval_seconds", cfg.Arrs.QueueCleanupIntervalSeconds) + } + + // 2. Start DownloadID Harvester Worker + harvestEnabled := true + if cfg.Arrs.HarvestDownloadIDsEnabled != nil { + harvestEnabled = *cfg.Arrs.HarvestDownloadIDsEnabled + } + + if harvestEnabled { + interval := time.Duration(cfg.Arrs.HarvestDownloadIDsIntervalHours) * time.Hour + if interval <= 0 { + interval = 24 * time.Hour + } + + w.workerWg.Add(1) + go w.runHarvester(interval) + slog.InfoContext(ctx, "ARR download ID harvester worker started", + "interval_hours", cfg.Arrs.HarvestDownloadIDsIntervalHours) + } - slog.InfoContext(ctx, "ARR queue cleanup worker started", - "interval_seconds", cfg.Arrs.QueueCleanupIntervalSeconds) return nil } @@ -138,6 +161,165 @@ func (w *Worker) safeCleanup() { } } +func (w *Worker) runHarvester(interval time.Duration) { + defer w.workerWg.Done() + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + // Initial run after a short delay + select { + case <-time.After(1 * time.Minute): + w.safeHarvest() + case <-w.workerCtx.Done(): + return + } + + for { + select { + case <-ticker.C: + w.safeHarvest() + case <-w.workerCtx.Done(): + return + } + } +} + +func (w *Worker) safeHarvest() { + defer func() { + if r := recover(); r != nil { + slog.Error("Panic in DownloadID harvester", "panic", r) + } + }() + if err := w.HarvestDownloadIDs(w.workerCtx); err != nil { + slog.Error("DownloadID harvesting failed", "error", err) + } +} + +// HarvestDownloadIDs scans library items in AltMount's history that are missing GUIDs +// and attempts to "harvest" them from ARR history for proactive tracking. +func (w *Worker) HarvestDownloadIDs(ctx context.Context) error { + slog.InfoContext(ctx, "Starting proactive DownloadID harvesting cycle for legacy library items") + + // 1. Get history items missing download_id + items, err := w.repo.GetHistoryMissingDownloadID(ctx, 100) // Process in chunks of 100 + if err != nil { + return fmt.Errorf("failed to get history items for harvesting: %w", err) + } + + if len(items) == 0 { + slog.InfoContext(ctx, "No legacy items found missing DownloadID") + return nil + } + + slog.InfoContext(ctx, "Found items missing DownloadID, attempting to harvest from ARRs", "count", len(items)) + + instances := w.instances.GetAllInstances() + harvestedCount := 0 + + for _, item := range items { + found := false + for _, instance := range instances { + if !instance.Enabled { + continue + } + + var downloadID string + switch instance.Type { + case "radarr", "whisparr": + downloadID = w.harvestRadarrDownloadID(ctx, instance, item) + case "sonarr": + downloadID = w.harvestSonarrDownloadID(ctx, instance, item) + } + + if downloadID != "" { + slog.InfoContext(ctx, "Harvested DownloadID for legacy item", + "path", item.VirtualPath, "download_id", downloadID, "instance", instance.Name) + if err := w.repo.UpdateDownloadIDByPath(ctx, item.VirtualPath, downloadID); err == nil { + harvestedCount++ + found = true + } + break + } + } + + if !found { + slog.DebugContext(ctx, "Could not find DownloadID in any ARR history for item", "path", item.VirtualPath) + } + } + + slog.InfoContext(ctx, "Finished DownloadID harvesting cycle", "harvested", harvestedCount, "total_processed", len(items)) + return nil +} + +func (w *Worker) harvestRadarrDownloadID(ctx context.Context, instance *model.ConfigInstance, item *database.ImportHistory) string { + client, err := w.clients.GetOrCreateRadarrClient(instance.Name, instance.URL, instance.APIKey) + if err != nil { + return "" + } + + // Fetch history using filename match if possible + req := &starr.PageReq{PageSize: 50, SortKey: "date", SortDir: starr.SortDescend} + // We search history for the filename + history, err := client.GetHistoryPageContext(ctx, req) + if err != nil { + return "" + } + + fileName := filepath.Base(item.VirtualPath) + + for _, record := range history.Records { + if record.DownloadID == "" { + continue + } + + // Match by path or filename + // Data contains either ImportedPath or DroppedPath depending on the event + if record.Data.ImportedPath == item.VirtualPath || + record.Data.DroppedPath == item.VirtualPath || + filepath.Base(record.Data.ImportedPath) == fileName || + filepath.Base(record.Data.DroppedPath) == fileName || + strings.Contains(record.SourceTitle, item.NzbName) { + return record.DownloadID + } + } + + return "" +} + +func (w *Worker) harvestSonarrDownloadID(ctx context.Context, instance *model.ConfigInstance, item *database.ImportHistory) string { + client, err := w.clients.GetOrCreateSonarrClient(instance.Name, instance.URL, instance.APIKey) + if err != nil { + return "" + } + + req := &starr.PageReq{PageSize: 50, SortKey: "date", SortDir: starr.SortDescend} + history, err := client.GetHistoryPageContext(ctx, req) + if err != nil { + return "" + } + + fileName := filepath.Base(item.VirtualPath) + + for _, record := range history.Records { + if record.DownloadID == "" { + continue + } + + // Match by path or filename + // Data contains either ImportedPath or DroppedPath depending on the event + if record.Data.ImportedPath == item.VirtualPath || + record.Data.DroppedPath == item.VirtualPath || + filepath.Base(record.Data.ImportedPath) == fileName || + filepath.Base(record.Data.DroppedPath) == fileName || + strings.Contains(record.SourceTitle, item.NzbName) { + return record.DownloadID + } + } + + return "" +} + // CleanupQueue checks all ARR instances for importPending items with empty folders // and removes them from the queue after deleting the empty folder func (w *Worker) CleanupQueue(ctx context.Context) error { diff --git a/internal/config/manager.go b/internal/config/manager.go index dc5a14db..224907f6 100644 --- a/internal/config/manager.go +++ b/internal/config/manager.go @@ -392,6 +392,8 @@ type ArrsConfig struct { CleanupAutomaticImportFailure *bool `yaml:"cleanup_automatic_import_failure" mapstructure:"cleanup_automatic_import_failure" json:"cleanup_automatic_import_failure,omitempty"` QueueCleanupGracePeriodMinutes int `yaml:"queue_cleanup_grace_period_minutes" mapstructure:"queue_cleanup_grace_period_minutes" json:"queue_cleanup_grace_period_minutes,omitempty"` QueueCleanupAllowlist []IgnoredMessage `yaml:"queue_cleanup_allowlist" mapstructure:"queue_cleanup_allowlist" json:"queue_cleanup_allowlist,omitempty"` + HarvestDownloadIDsEnabled *bool `yaml:"harvest_download_ids_enabled" mapstructure:"harvest_download_ids_enabled" json:"harvest_download_ids_enabled,omitempty"` + HarvestDownloadIDsIntervalHours int `yaml:"harvest_download_ids_interval_hours" mapstructure:"harvest_download_ids_interval_hours" json:"harvest_download_ids_interval_hours,omitempty"` } // ArrsInstanceConfig represents a single arrs instance configuration @@ -1221,6 +1223,7 @@ func DefaultConfig(configDir ...string) *Config { watchIntervalSeconds := 10 // Default watch interval failedItemRetentionHours := 24 // Default: auto-remove failed items after 24 hours cleanupAutomaticImportFailure := false + harvestDownloadIDsEnabled := true metadataBackupEnabled := false failureMaskingEnabled := true repairEnabled := true @@ -1431,6 +1434,8 @@ func DefaultConfig(configDir ...string) *Config { WhisparrInstances: []ArrsInstanceConfig{}, CleanupAutomaticImportFailure: &cleanupAutomaticImportFailure, QueueCleanupGracePeriodMinutes: 10, // Default to 10 minutes + HarvestDownloadIDsEnabled: &harvestDownloadIDsEnabled, + HarvestDownloadIDsIntervalHours: 24, // Default to once a day QueueCleanupAllowlist: []IgnoredMessage{ {Message: "No files found are eligible", Enabled: true}, {Message: "One or more episodes expected in this release were not imported or missing", Enabled: true}, diff --git a/internal/database/health_repository.go b/internal/database/health_repository.go index 0d3de34e..12d0fd59 100644 --- a/internal/database/health_repository.go +++ b/internal/database/health_repository.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "log/slog" + "path/filepath" "strings" "time" ) @@ -1773,6 +1774,34 @@ func (r *HealthRepository) GetFilesForLibrarySync(ctx context.Context) ([]*FileH return files, nil } +// GetDownloadIDForSourceNzb retrieves the download_id (GUID) associated with a source NZB path. +// It checks both the active import_queue and the persistent import_history. +func (r *HealthRepository) GetDownloadIDForSourceNzb(ctx context.Context, sourceNzbPath string) (string, error) { + if sourceNzbPath == "" { + return "", nil + } + + // 1. Try active queue first (most likely for recent corruptions) + var downloadID sql.NullString + queryQueue := `SELECT download_id FROM import_queue WHERE nzb_path = ? LIMIT 1` + err := r.db.QueryRowContext(ctx, queryQueue, sourceNzbPath).Scan(&downloadID) + if err == nil && downloadID.Valid && downloadID.String != "" { + return downloadID.String, nil + } + + // 2. Try persistent history + // We match based on the filename because virtual_path in history is the output folder, + // while source_nzb_path in health is the .nzb file path. + nzbName := filepath.Base(sourceNzbPath) + queryHistory := `SELECT download_id FROM import_history WHERE nzb_name = ? AND download_id IS NOT NULL AND download_id != '' LIMIT 1` + err = r.db.QueryRowContext(ctx, queryHistory, nzbName).Scan(&downloadID) + if err == nil && downloadID.Valid && downloadID.String != "" { + return downloadID.String, nil + } + + return "", nil +} + // HasImportHistoryForPath checks if any import history record exists for the // given virtual path. Used to protect symlinks from deletion when an import // has been recorded by AltMount, regardless of current metadata state. diff --git a/internal/database/repository.go b/internal/database/repository.go index 4fa78808..d9a87746 100644 --- a/internal/database/repository.go +++ b/internal/database/repository.go @@ -349,6 +349,56 @@ func (r *Repository) GetQueueItemByDownloadID(ctx context.Context, downloadID st return &item, nil } +// UpdateDownloadIDByPath updates the download_id for a history record by its virtual path +func (r *Repository) UpdateDownloadIDByPath(ctx context.Context, virtualPath string, downloadID string) error { + query := `UPDATE import_history SET download_id = ? WHERE virtual_path = ? AND (download_id IS NULL OR download_id = '')` + _, err := r.db.ExecContext(ctx, query, downloadID, virtualPath) + if err != nil { + return fmt.Errorf("failed to update history download_id: %w", err) + } + return nil +} + +// GetHistoryMissingDownloadID retrieves history items that are missing a download_id +func (r *Repository) GetHistoryMissingDownloadID(ctx context.Context, limit int) ([]*ImportHistory, error) { + query := ` + SELECT id, download_id, nzb_id, nzb_name, file_name, file_size, virtual_path, category, completed_at + FROM import_history + WHERE (download_id IS NULL OR download_id = '') + ORDER BY completed_at DESC + LIMIT ? + ` + rows, err := r.db.QueryContext(ctx, query, limit) + if err != nil { + return nil, fmt.Errorf("failed to query history missing download_id: %w", err) + } + defer rows.Close() + + var items []*ImportHistory + for rows.Next() { + var item ImportHistory + err := rows.Scan( + &item.ID, &item.DownloadID, &item.NzbID, &item.NzbName, &item.FileName, &item.FileSize, + &item.VirtualPath, &item.Category, &item.CompletedAt, + ) + if err != nil { + return nil, fmt.Errorf("failed to scan history missing download_id: %w", err) + } + items = append(items, &item) + } + return items, rows.Err() +} + +// UpdateQueueItemErrorMessageByDownloadID updates the error message for a queue item by its DownloadID +func (r *Repository) UpdateQueueItemErrorMessageByDownloadID(ctx context.Context, downloadID string, errorMessage string) error { + query := `UPDATE import_queue SET error_message = ?, updated_at = datetime('now') WHERE download_id = ?` + _, err := r.db.ExecContext(ctx, query, errorMessage, downloadID) + if err != nil { + return fmt.Errorf("failed to update queue item error message: %w", err) + } + return nil +} + // RemoveFromQueueByDownloadID removes an item from the queue by its DownloadID func (r *Repository) RemoveFromQueueByDownloadID(ctx context.Context, downloadID string) error { query := `DELETE FROM import_queue WHERE download_id = ?` diff --git a/internal/health/repair_e2e_test.go b/internal/health/repair_e2e_test.go index 1d540a7b..801ce625 100644 --- a/internal/health/repair_e2e_test.go +++ b/internal/health/repair_e2e_test.go @@ -61,6 +61,12 @@ func (m *mockARRsService) TriggerFileRescan(_ context.Context, pathForRescan str return m.returnErr } +func (m *mockARRsService) TriggerRepairByDownloadID(_ context.Context, downloadID string, reason string) error { + m.mu.Lock() + defer m.mu.Unlock() + return m.returnErr +} + // mockImportService implements importer.ImportService for testing. type mockImportService struct { importer.ImportService diff --git a/internal/health/worker.go b/internal/health/worker.go index d273d3a4..5511c43b 100644 --- a/internal/health/worker.go +++ b/internal/health/worker.go @@ -25,6 +25,7 @@ import ( // ARRsRepairService abstracts the ARR repair operations needed by HealthWorker. type ARRsRepairService interface { TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string) error + TriggerRepairByDownloadID(ctx context.Context, downloadID string, reason string) error } // WorkerStatus represents the current status of the health worker @@ -957,10 +958,29 @@ func (hw *HealthWorker) triggerFileRepair(ctx context.Context, item *database.Fi } } - slog.InfoContext(ctx, "Triggering file repair using direct ARR API approach", "file_path", filePath) + slog.InfoContext(ctx, "Triggering file repair", "file_path", filePath) - pathForRescan := hw.resolvePathForRescan(item) + // STRATEGY 1: Repair by Download ID (GUID) + // This is the most reliable method as it avoids path mapping issues. + if item.SourceNzbPath != nil && *item.SourceNzbPath != "" { + downloadID, err := hw.healthRepo.GetDownloadIDForSourceNzb(ctx, *item.SourceNzbPath) + if err == nil && downloadID != "" { + reason := "Physical corruption" + if errorMsg != nil && *errorMsg != "" { + reason = *errorMsg + } + slog.InfoContext(ctx, "Attempting repair by download ID (GUID)", "file_path", filePath, "download_id", downloadID, "reason", reason) + err = hw.arrsService.TriggerRepairByDownloadID(ctx, downloadID, reason) + if err == nil { + slog.InfoContext(ctx, "Successfully triggered repair by download ID", "file_path", filePath, "download_id", downloadID) + return hw.handleSuccessfulRepairTrigger(ctx, item) + } + slog.DebugContext(ctx, "Repair by download ID failed or item not in ARR queue, falling back to path-based rescan", "file_path", filePath, "error", err) + } + } + // STRATEGY 2: Fallback to Path-based Rescan + pathForRescan := hw.resolvePathForRescan(item) err := hw.arrsService.TriggerFileRescan(ctx, pathForRescan, filePath) if err != nil { if errors.Is(err, arrs.ErrEpisodeAlreadySatisfied) || errors.Is(err, arrs.ErrPathMatchFailed) { @@ -977,28 +997,11 @@ func (hw *HealthWorker) triggerFileRepair(ctx context.Context, item *database.Fi return repairOutcomeCorrupted, err } - // ARR rescan was triggered successfully. slog.InfoContext(ctx, "Successfully triggered ARR rescan for file repair", "file_path", filePath, "path_for_rescan", pathForRescan) - // Move the metadata file to the corrupted folder so FUSE/WebDAV stops showing it. - // CRITICAL: We only do this if the file has already been imported (has a LibraryPath). - // If it hasn't been imported yet, we keep it visible so ARR can see the "Missing File" - // or "Empty Folder" and report its own warning, which helps the repair cycle. - if item.LibraryPath != nil && *item.LibraryPath != "" { - cfg := hw.configGetter() - relativePath := strings.TrimPrefix(filePath, cfg.MountPath) - relativePath = strings.TrimPrefix(relativePath, "/") - slog.InfoContext(ctx, "Moving metadata file for corrupted item to safety folder to trigger replacement", "file_path", filePath) - if moveErr := hw.metadataService.MoveToCorrupted(ctx, relativePath); moveErr != nil { - slog.WarnContext(ctx, "Failed to move corrupted metadata file, proceeding with repair trigger status", "error", moveErr) - } - } else { - slog.InfoContext(ctx, "Skipping metadata move for corrupted item - file not yet imported by ARR", "file_path", filePath) - } - - return repairOutcomeTriggered, nil + return hw.handleSuccessfulRepairTrigger(ctx, item) } // retriggerFileRepair re-triggers the ARR rescan for a file already in repair_triggered state. @@ -1006,10 +1009,28 @@ func (hw *HealthWorker) triggerFileRepair(ctx context.Context, item *database.Fi // Callers must apply the returned outcome to the HealthStatusUpdate before the bulk DB write. func (hw *HealthWorker) retriggerFileRepair(ctx context.Context, item *database.FileHealth) (repairOutcome, error) { filePath := item.FilePath - pathForRescan := hw.resolvePathForRescan(item) - slog.InfoContext(ctx, "Re-triggering ARR rescan for file in repair", "file_path", filePath, "path_for_rescan", pathForRescan) + slog.InfoContext(ctx, "Re-triggering ARR rescan for file in repair", "file_path", filePath) + // STRATEGY 1: Repair by Download ID (GUID) + if item.SourceNzbPath != nil && *item.SourceNzbPath != "" { + downloadID, err := hw.healthRepo.GetDownloadIDForSourceNzb(ctx, *item.SourceNzbPath) + if err == nil && downloadID != "" { + reason := "Physical corruption" + if item.LastError != nil && *item.LastError != "" { + reason = *item.LastError + } + slog.InfoContext(ctx, "Attempting repair by download ID (GUID) during re-trigger", "file_path", filePath, "download_id", downloadID, "reason", reason) + err = hw.arrsService.TriggerRepairByDownloadID(ctx, downloadID, reason) + if err == nil { + slog.InfoContext(ctx, "Successfully re-triggered repair by download ID", "file_path", filePath) + return repairOutcomeTriggered, nil + } + } + } + + // STRATEGY 2: Fallback to Path-based Rescan + pathForRescan := hw.resolvePathForRescan(item) err := hw.arrsService.TriggerFileRescan(ctx, pathForRescan, filePath) if err != nil { if errors.Is(err, arrs.ErrEpisodeAlreadySatisfied) || errors.Is(err, arrs.ErrPathMatchFailed) { @@ -1025,3 +1046,26 @@ func (hw *HealthWorker) retriggerFileRepair(ctx context.Context, item *database. slog.InfoContext(ctx, "Successfully re-triggered ARR rescan", "file_path", filePath) return repairOutcomeTriggered, nil } + +// handleSuccessfulRepairTrigger performs common post-trigger actions like moving metadata. +func (hw *HealthWorker) handleSuccessfulRepairTrigger(ctx context.Context, item *database.FileHealth) (repairOutcome, error) { + filePath := item.FilePath + + // Move the metadata file to the corrupted folder so FUSE/WebDAV stops showing it. + // CRITICAL: We only do this if the file has already been imported (has a LibraryPath). + // If it hasn't been imported yet, we keep it visible so ARR can see the "Missing File" + // or "Empty Folder" and report its own warning, which helps the repair cycle. + if item.LibraryPath != nil && *item.LibraryPath != "" { + cfg := hw.configGetter() + relativePath := strings.TrimPrefix(filePath, cfg.MountPath) + relativePath = strings.TrimPrefix(relativePath, "/") + slog.InfoContext(ctx, "Moving metadata file for corrupted item to safety folder to trigger replacement", "file_path", filePath) + if moveErr := hw.metadataService.MoveToCorrupted(ctx, relativePath); moveErr != nil { + slog.WarnContext(ctx, "Failed to move corrupted metadata file, proceeding with repair trigger status", "error", moveErr) + } + } else { + slog.InfoContext(ctx, "Skipping metadata move for corrupted item - file not yet imported by ARR", "file_path", filePath) + } + + return repairOutcomeTriggered, nil +} diff --git a/internal/importer/processor.go b/internal/importer/processor.go index dd41abdf..41dbed2a 100644 --- a/internal/importer/processor.go +++ b/internal/importer/processor.go @@ -168,7 +168,7 @@ func (proc *Processor) checkCancellation(ctx context.Context) error { // Returns (resultPath, writtenMetadataPaths, error). writtenMetadataPaths contains all virtual paths of // metadata files written to disk; it is populated even on partial failure so callers can clean up. // Paths prefixed with "DIR:" indicate a metadata directory that should be removed entirely. -func (proc *Processor) ProcessNzbFile(ctx context.Context, filePath, relativePath string, queueID int, allowedExtensionsOverride *[]string, virtualDirOverride *string, extractedFiles []parser.ExtractedFileInfo, category *string, metadata *string) (string, []string, error) { +func (proc *Processor) ProcessNzbFile(ctx context.Context, filePath, relativePath string, queueID int, allowedExtensionsOverride *[]string, virtualDirOverride *string, extractedFiles []parser.ExtractedFileInfo, category *string, metadata *string, downloadID *string) (string, []string, error) { cfg := proc.configGetter() // Determine max connections to use @@ -267,23 +267,23 @@ func (proc *Processor) ProcessNzbFile(ctx context.Context, filePath, relativePat switch parsed.Type { case parser.NzbTypeSingleFile: proc.updateProgressWithStage(queueID, 30, "Validating segments") - result, writtenPaths, err = proc.processSingleFile(ctx, virtualDir, regularFiles, par2Files, parsed.Path, queueID, maxConnections, allowedExtensions, proc.validationTimeout, category, metadata) + result, writtenPaths, err = proc.processSingleFile(ctx, virtualDir, regularFiles, par2Files, parsed.Path, queueID, maxConnections, allowedExtensions, proc.validationTimeout, category, metadata, downloadID) case parser.NzbTypeMultiFile: proc.updateProgressWithStage(queueID, 30, "Validating segments") - result, writtenPaths, err = proc.processMultiFile(ctx, virtualDir, regularFiles, par2Files, parsed.Path, queueID, maxConnections, allowedExtensions, proc.validationTimeout, category, metadata) + result, writtenPaths, err = proc.processMultiFile(ctx, virtualDir, regularFiles, par2Files, parsed.Path, queueID, maxConnections, allowedExtensions, proc.validationTimeout, category, metadata, downloadID) case parser.NzbTypeRarArchive: proc.updateProgressWithStage(queueID, 15, "Analyzing archive") - result, writtenPaths, err = proc.processRarArchive(ctx, virtualDir, regularFiles, archiveFiles, parsed, queueID, maxConnections, allowedExtensions, proc.validationTimeout, parsed.ExtractedFiles, category, metadata) + result, writtenPaths, err = proc.processRarArchive(ctx, virtualDir, regularFiles, archiveFiles, parsed, queueID, maxConnections, allowedExtensions, proc.validationTimeout, parsed.ExtractedFiles, category, metadata, downloadID) case parser.NzbType7zArchive: proc.updateProgressWithStage(queueID, 15, "Analyzing archive") - result, writtenPaths, err = proc.processSevenZipArchive(ctx, virtualDir, regularFiles, archiveFiles, parsed, queueID, maxConnections, allowedExtensions, proc.validationTimeout, parsed.ExtractedFiles, category, metadata) + result, writtenPaths, err = proc.processSevenZipArchive(ctx, virtualDir, regularFiles, archiveFiles, parsed, queueID, maxConnections, allowedExtensions, proc.validationTimeout, parsed.ExtractedFiles, category, metadata, downloadID) case parser.NzbTypeStrm: proc.updateProgressWithStage(queueID, 30, "Validating segments") - result, writtenPaths, err = proc.processSingleFile(ctx, virtualDir, regularFiles, par2Files, parsed.Path, queueID, maxConnections, allowedExtensions, proc.validationTimeout, category, metadata) + result, writtenPaths, err = proc.processSingleFile(ctx, virtualDir, regularFiles, par2Files, parsed.Path, queueID, maxConnections, allowedExtensions, proc.validationTimeout, category, metadata, downloadID) default: return "", nil, NewNonRetryableError(fmt.Sprintf("unknown file type: %s", parsed.Type), nil) @@ -312,6 +312,7 @@ func (proc *Processor) processSingleFile( timeout time.Duration, category *string, metadata *string, + downloadID *string, ) (string, []string, error) { if len(regularFiles) == 0 { return "", nil, fmt.Errorf("no regular files to process") @@ -408,6 +409,7 @@ func (proc *Processor) processSingleFile( if proc.recorder != nil { nzbID := int64(queueID) _ = proc.recorder.AddImportHistory(ctx, &database.ImportHistory{ + DownloadID: downloadID, NzbID: &nzbID, NzbName: nzbName, FileName: finalName, @@ -435,6 +437,7 @@ func (proc *Processor) processMultiFile( timeout time.Duration, category *string, metadata *string, + downloadID *string, ) (string, []string, error) { // If there's only one regular file (and the rest are likely PAR2s), avoid creating a redundant // NZB-named directory that matches the file itself. Instead, keep the file directly under the @@ -515,6 +518,7 @@ func (proc *Processor) processMultiFile( } _ = proc.recorder.AddImportHistory(ctx, &database.ImportHistory{ + DownloadID: downloadID, NzbID: &nzbID, NzbName: nzbName, FileName: filepath.Base(targetBaseDir), @@ -543,6 +547,7 @@ func (proc *Processor) processRarArchive( extractedFiles []parser.ExtractedFileInfo, category *string, metadata *string, + downloadID *string, ) (string, []string, error) { importCfg := proc.configGetter().Import samplePercentage := importCfg.SegmentSamplePercentage @@ -650,6 +655,7 @@ func (proc *Processor) processRarArchive( } _ = proc.recorder.AddImportHistory(ctx, &database.ImportHistory{ + DownloadID: downloadID, NzbID: &nzbID, NzbName: nzbName, FileName: filepath.Base(nzbFolder), @@ -678,6 +684,7 @@ func (proc *Processor) processSevenZipArchive( extractedFiles []parser.ExtractedFileInfo, category *string, metadata *string, + downloadID *string, ) (string, []string, error) { importCfg := proc.configGetter().Import samplePercentage := importCfg.SegmentSamplePercentage @@ -784,6 +791,7 @@ func (proc *Processor) processSevenZipArchive( } _ = proc.recorder.AddImportHistory(ctx, &database.ImportHistory{ + DownloadID: downloadID, NzbID: &nzbID, NzbName: nzbName, FileName: filepath.Base(nzbFolder), diff --git a/internal/importer/service.go b/internal/importer/service.go index afdbc7c1..6f9aac02 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -704,7 +704,7 @@ func (s *Service) processNzbItem(ctx context.Context, item *database.ImportQueue } } - return s.processor.ProcessNzbFile(ctx, item.NzbPath, basePath, int(item.ID), allowedExtensionsOverride, &virtualDir, extractedFiles, item.Category, item.Metadata) + return s.processor.ProcessNzbFile(ctx, item.NzbPath, basePath, int(item.ID), allowedExtensionsOverride, &virtualDir, extractedFiles, item.Category, item.Metadata, item.DownloadID) } func (s *Service) calculateProcessVirtualDir(item *database.ImportQueueItem, basePath *string) string { @@ -1385,7 +1385,7 @@ func (s *Service) RegenerateMetadata(ctx context.Context, mountRelativePath stri // Re-process the NZB file. We use a dummy queue ID. // This will overwrite the existing .meta file. - _, _, err = s.processor.ProcessNzbFile(ctx, foundNzbPath, "", 0, nil, &virtualDir, nil, nil, nil) + _, _, err = s.processor.ProcessNzbFile(ctx, foundNzbPath, "", 0, nil, &virtualDir, nil, nil, nil, nil) if err != nil { return fmt.Errorf("failed to re-process NZB: %w", err) }