diff --git a/build-test-image.sh b/build-test-image.sh new file mode 100644 index 00000000..8cab48d9 --- /dev/null +++ b/build-test-image.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Script to build a local test image for AltMount +# This uses the multi-stage Dockerfile which builds both frontend and backend + +echo "🚀 Building AltMount test image..." + +# Check if docker is installed +if ! command -v docker &> /dev/null; then + echo "❌ Error: docker is not installed." + exit 1 +fi + +# Build the image +# We use the root directory as context and the dev Dockerfile +docker build -t altmount:test -f docker/Dockerfile . + +if [ $? -eq 0 ]; then + echo "✅ Success! Test image 'altmount:test' created." + echo "" + echo "To run it:" + echo "docker run -p 8080:8080 -v ./config:/config -v ./metadata:/metadata altmount:test" +else + echo "❌ Error: Docker build failed." + exit 1 +fi diff --git a/config/altmount.log b/config/altmount.log new file mode 100644 index 00000000..ce4dad51 --- /dev/null +++ b/config/altmount.log @@ -0,0 +1,30 @@ +{"time":"2026-03-22T02:57:58.91794191Z","level":"INFO","msg":"OK 018_add_target_path.sql (27.11ms)"} +{"time":"2026-03-22T02:57:58.919872796Z","level":"INFO","msg":"goose: successfully migrated database to version: 18"} +{"time":"2026-03-22T02:57:58.919937059Z","level":"INFO","msg":"Starting server without NNTP providers - configure via API to enable downloads"} +{"time":"2026-03-22T02:57:58.920043125Z","level":"INFO","msg":"RClone RC notifications disabled"} +{"time":"2026-03-22T02:57:58.926747028Z","level":"INFO","msg":"Updated database connection pool","component":"importer-service","workers":2,"max_connections":6} +{"time":"2026-03-22T02:57:58.927155504Z","level":"INFO","msg":"Queue manager started","component":"queue-manager","workers":2} +{"time":"2026-03-22T02:57:58.927194381Z","level":"INFO","msg":"NZB import service started successfully with 2 workers","component":"importer-service"} +{"time":"2026-03-22T02:57:58.927222953Z","level":"INFO","msg":"Segment cache disabled"} +{"time":"2026-03-22T02:57:58.929924652Z","level":"INFO","msg":"Authentication service initialized"} +{"time":"2026-03-22T02:57:58.933070585Z","level":"INFO","msg":"Health system disabled - no health monitoring or repairs will occur"} +{"time":"2026-03-22T02:57:58.933746428Z","level":"INFO","msg":"ARR queue cleanup disabled (ARRs disabled)"} +{"time":"2026-03-22T02:57:58.933803834Z","level":"INFO","msg":"Arrs service is disabled in configuration"} +{"time":"2026-03-22T02:57:58.933834247Z","level":"INFO","msg":"AltMount server started","port":8080,"webdav_path":"/webdav","api_path":"/api","providers":0,"processor_workers":2} +{"time":"2026-03-22T02:58:00.934356216Z","level":"INFO","msg":"RClone mount service is disabled in configuration"} +{"time":"2026-03-22T02:58:03.93454291Z","level":"WARN","msg":"No admin API key found, skipping automatic webhook registration"} +{"time":"2026-03-22T02:58:13.228068067Z","level":"INFO","msg":"Received shutdown signal","signal":"terminated"} +{"time":"2026-03-22T02:58:13.228135982Z","level":"INFO","msg":"Starting graceful shutdown sequence"} +{"time":"2026-03-22T02:58:13.228145004Z","level":"ERROR","msg":"Failed to stop health worker","error":"health worker not running"} +{"time":"2026-03-22T02:58:13.228169162Z","level":"INFO","msg":"Shutting down server..."} +{"time":"2026-03-22T02:58:13.228214053Z","level":"INFO","msg":"Server shutdown completed"} +{"time":"2026-03-22T02:58:13.228219667Z","level":"INFO","msg":"AltMount server shutdown completed successfully"} +{"time":"2026-03-22T02:58:13.22822436Z","level":"INFO","msg":"Closing importer service"} +{"time":"2026-03-22T02:58:13.228237368Z","level":"INFO","msg":"Stopping NZB import service","component":"importer-service"} +{"time":"2026-03-22T02:58:13.228245373Z","level":"INFO","msg":"Stopping queue manager","component":"queue-manager"} +{"time":"2026-03-22T02:58:13.228257391Z","level":"INFO","msg":"Queue worker stopped","component":"queue-manager","worker_id":0} +{"time":"2026-03-22T02:58:13.228267634Z","level":"INFO","msg":"Queue worker stopped","component":"queue-manager","worker_id":1} +{"time":"2026-03-22T02:58:13.228305031Z","level":"INFO","msg":"Queue manager stopped","component":"queue-manager"} +{"time":"2026-03-22T02:58:13.228313284Z","level":"INFO","msg":"NZB import service stopped","component":"importer-service"} +{"time":"2026-03-22T02:58:13.228323531Z","level":"INFO","msg":"Clearing NNTP pool"} +{"time":"2026-03-22T02:58:13.228329396Z","level":"INFO","msg":"Closing database"} diff --git a/internal/api/arrs_handlers.go b/internal/api/arrs_handlers.go index 7ef73356..f431360e 100644 --- a/internal/api/arrs_handlers.go +++ b/internal/api/arrs_handlers.go @@ -307,6 +307,18 @@ func (s *Server) handleArrsWebhook(c *fiber.Ctx) error { } } + // Redundant Deletion Guard: ensure the file is gone from the local mount + if s.configManager != nil { + cfg := s.configManager.GetConfig() + if cfg.MountPath != "" { + localPath := filepath.Join(cfg.MountPath, metadataPath) + if _, err := os.Stat(localPath); err == nil { + slog.InfoContext(c.Context(), "Redundant Deletion Guard: Manual removal of ghost file from mount", "path", localPath) + _ = os.Remove(localPath) + } + } + } + } // Process Directory Deletions diff --git a/internal/api/nzb_stremio_handlers.go b/internal/api/nzb_stremio_handlers.go index 019e02bc..abc80d65 100644 --- a/internal/api/nzb_stremio_handlers.go +++ b/internal/api/nzb_stremio_handlers.go @@ -181,7 +181,7 @@ func (s *Server) handleNzbStreams(c *fiber.Ctx) error { } priority := database.QueuePriorityHigh - item, err := s.importerService.AddToQueue(ctx, tempPath, basePath, &category, &priority, nil) + item, err := s.importerService.AddToQueue(ctx, tempPath, basePath, &category, &priority, nil, nil) if err != nil { os.Remove(tempPath) return RespondInternalError(c, "Failed to add NZB to queue", err.Error()) diff --git a/internal/api/queue_handlers.go b/internal/api/queue_handlers.go index 68d31e4e..565e0cc7 100644 --- a/internal/api/queue_handlers.go +++ b/internal/api/queue_handlers.go @@ -647,7 +647,7 @@ func (s *Server) handleUploadToQueue(c *fiber.Ctx) error { // For manually uploaded files, pass CompleteDir as the base path (not the temp upload directory) // The category will be appended to this by processNzbItem in the service - item, err := s.importerService.AddToQueue(c.Context(), tempFile, basePath, categoryPtr, &priority, nil) + item, err := s.importerService.AddToQueue(c.Context(), tempFile, basePath, categoryPtr, &priority, nil, nil) if err != nil { // Clean up temp file on error os.Remove(tempFile) @@ -782,7 +782,7 @@ func (s *Server) handleUploadNZBLnk(c *fiber.Ctx) error { } priority := database.QueuePriority(req.Priority) - item, err := s.importerService.AddToQueue(c.Context(), tempFile, basePath, categoryPtr, &priority, nil) + item, err := s.importerService.AddToQueue(c.Context(), tempFile, basePath, categoryPtr, &priority, nil, nil) if err != nil { os.Remove(tempFile) result.ErrorMessage = "Failed to add to queue: " + err.Error() @@ -923,7 +923,7 @@ func (s *Server) handleSearchNZBByName(c *fiber.Ctx) error { } priority := database.QueuePriority(req.Priority) - item, err := s.importerService.AddToQueue(c.Context(), tempFile, basePath, categoryPtr, &priority, nil) + item, err := s.importerService.AddToQueue(c.Context(), tempFile, basePath, categoryPtr, &priority, nil, nil) if err != nil { os.Remove(tempFile) return RespondInternalError(c, "Failed to add to queue", err.Error()) @@ -1170,7 +1170,7 @@ func (s *Server) handleAddTestQueueItem(c *fiber.Ctx) error { } } - item, err := s.importerService.AddToQueue(c.Context(), tempPath, basePath, &category, &priority, nil) + item, err := s.importerService.AddToQueue(c.Context(), tempPath, basePath, &category, &priority, nil, nil) if err != nil { os.Remove(tempPath) return RespondInternalError(c, "Failed to add test file to queue", err.Error()) diff --git a/internal/api/sabnzbd_handlers.go b/internal/api/sabnzbd_handlers.go index 4717cec9..f348445f 100644 --- a/internal/api/sabnzbd_handlers.go +++ b/internal/api/sabnzbd_handlers.go @@ -20,6 +20,7 @@ import ( "github.com/gofiber/fiber/v2" "github.com/javi11/altmount/internal/arrs" + "github.com/google/uuid" "github.com/javi11/altmount/internal/config" "github.com/javi11/altmount/internal/database" "github.com/javi11/altmount/internal/httpclient" @@ -365,10 +366,17 @@ func (s *Server) handleSABnzbdAddFile(c *fiber.Ctx) error { } } + // Generate a stable download ID (GUID) for Sonarr/Radarr tracking + // Some indexers provide a GUID in the 'nzbname' or 'name' parameter + downloadID := c.FormValue("nzbname") + if downloadID == "" { + downloadID = uuid.New().String() + } + // Add the file to the processing queue using centralized method completeDir := s.configManager.GetConfig().SABnzbd.CompleteDir priority := s.parseSABnzbdPriority(c.FormValue("priority")) - item, err := s.importerService.AddToQueue(c.Context(), tempFile, &completeDir, &validatedCategory, &priority, metadataJSON) + _, err = s.importerService.AddToQueue(c.Context(), tempFile, &completeDir, &validatedCategory, &priority, metadataJSON, &downloadID) if err != nil { return s.writeSABnzbdErrorFiber(c, "Failed to add to queue") } @@ -376,7 +384,7 @@ func (s *Server) handleSABnzbdAddFile(c *fiber.Ctx) error { // Return success response response := SABnzbdAddResponse{ Status: true, - NzoIds: []string{fmt.Sprintf("%d", item.ID)}, + NzoIds: []string{downloadID}, } return s.writeSABnzbdResponseFiber(c, response) @@ -507,7 +515,19 @@ func (s *Server) handleSABnzbdAddUrl(c *fiber.Ctx) error { // Add the file to the processing queue using centralized method completeDir := s.configManager.GetConfig().SABnzbd.CompleteDir priority := s.parseSABnzbdPriority(c.Query("priority")) - item, err := s.importerService.AddToQueue(c.Context(), tempFile, &completeDir, &validatedCategory, &priority, metadataJSON) + + // Generate or extract stable download ID for tracking + // Some indexers provide a GUID in the 'nzbname' or 'name' parameter + downloadID := c.Query("nzbname") + if downloadID == "" { + // Use filename (without extension) as a fallback ID if it looks like a GUID + downloadID = strings.TrimSuffix(filename, filepath.Ext(filename)) + if len(downloadID) < 20 { // Simple heuristic: GUIDs are usually long + downloadID = uuid.New().String() + } + } + + _, err = s.importerService.AddToQueue(c.Context(), tempFile, &completeDir, &validatedCategory, &priority, metadataJSON, &downloadID) if err != nil { return s.writeSABnzbdErrorFiber(c, "Failed to add to queue") } @@ -515,7 +535,7 @@ func (s *Server) handleSABnzbdAddUrl(c *fiber.Ctx) error { // Return success response response := SABnzbdAddResponse{ Status: true, - NzoIds: []string{fmt.Sprintf("%d", item.ID)}, + NzoIds: []string{downloadID}, } return s.writeSABnzbdResponseFiber(c, response) @@ -679,6 +699,14 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error { // Get category filter from query parameter categoryFilter := s.normalizeCategoryFilter(c) + // Get specific job IDs if requested + nzoIDs := make(map[string]bool) + if ids := c.Query("nzo_ids"); ids != "" { + for _, id := range strings.Split(ids, ",") { + nzoIDs[strings.TrimSpace(id)] = true + } + } + // Get pagination parameters start := 0 if s := c.Query("start"); s != "" { @@ -686,16 +714,17 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error { start = val } } - limit := 50 + limit := 0 // 0 means all items in SABnzbd if l := c.Query("limit"); l != "" { if val, err := strconv.Atoi(l); err == nil { limit = val } } - // Get completed items from active queue (not yet deleted) + // Fetch items from active queue + // We use a larger set here to ensure we get everything for deduplication and combined history completedStatus := database.QueueStatusCompleted - completedQueueItems, err := s.queueRepo.ListQueueItems(c.Context(), &completedStatus, "", categoryFilter, limit, start, "updated_at", "desc") + completedQueueItems, err := s.queueRepo.ListQueueItems(c.Context(), &completedStatus, "", categoryFilter, 10000, 0, "updated_at", "desc") if err != nil { return s.writeSABnzbdErrorFiber(c, "Failed to get completed items from queue") } @@ -714,6 +743,16 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error { for _, item := range completedQueueItems { name := filepath.Base(item.NzbPath) + // Filter by nzo_ids if requested (check both integer ID and DownloadID) + if len(nzoIDs) > 0 { + match := nzoIDs[fmt.Sprintf("%d", item.ID)] + if !match && item.DownloadID != nil { + match = nzoIDs[*item.DownloadID] + } + if !match { + continue + } + } if !seenNames[name] { finalItems = append(finalItems, item) seenNames[name] = true @@ -727,13 +766,26 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error { continue } - if !seenNames[item.NzbName] { - id := item.ID - if item.NzbID != nil { - id = *item.NzbID + id := item.ID + if item.NzbID != nil { + id = *item.NzbID + } + + // Filter by nzo_ids if requested + if len(nzoIDs) > 0 { + match := nzoIDs[fmt.Sprintf("%d", id)] + if !match && item.DownloadID != nil { + match = nzoIDs[*item.DownloadID] + } + if !match { + continue } + } + + if !seenNames[item.NzbName] { qItem := &database.ImportQueueItem{ ID: id, + DownloadID: item.DownloadID, NzbPath: item.NzbName, Status: database.QueueStatusCompleted, FileSize: &item.FileSize, @@ -748,41 +800,54 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error { // Get failed items from active queue failedStatus := database.QueueStatusFailed - failed, err := s.queueRepo.ListQueueItems(c.Context(), &failedStatus, "", categoryFilter, limit, start, "updated_at", "desc") + failed, err := s.queueRepo.ListQueueItems(c.Context(), &failedStatus, "", categoryFilter, 1000, 0, "updated_at", "desc") if err != nil { return s.writeSABnzbdErrorFiber(c, "Failed to get failed items") } - // Get total failed count - totalFailed, err := s.queueRepo.CountQueueItems(c.Context(), &failedStatus, "", categoryFilter) - if err != nil { - totalFailed = len(failed) + // Combine failed items for noofslots calculation + for _, item := range failed { + name := filepath.Base(item.NzbPath) + // Filter by nzo_ids if requested + if len(nzoIDs) > 0 { + match := nzoIDs[fmt.Sprintf("%d", item.ID)] + if !match && item.DownloadID != nil { + match = nzoIDs[*item.DownloadID] + } + if !match { + continue + } + } + if !seenNames[name] { + finalItems = append(finalItems, item) + seenNames[name] = true + } } - // Combine and convert to SABnzbd format - slots := make([]SABnzbdHistorySlot, 0, len(finalItems)+len(failed)) - index := 0 - var totalBytes int64 + // Total available items before pagination + totalAvailableCount := len(finalItems) - for _, item := range finalItems { - // Calculate category-specific base path - itemBasePath := s.calculateItemBasePath() - finalPath := s.calculateHistoryStoragePath(item, itemBasePath) + // Apply pagination (start and limit) + if start < len(finalItems) { + finalItems = finalItems[start:] + } else { + finalItems = []*database.ImportQueueItem{} + } - slot := ToSABnzbdHistorySlot(item, start+index, finalPath) - slots = append(slots, slot) - totalBytes += slot.Bytes - index++ + if limit > 0 && len(finalItems) > limit { + finalItems = finalItems[:limit] } - for _, item := range failed { - // Calculate category-specific base path for this item - itemBasePath := s.calculateItemBasePath() - finalPath := s.calculateHistoryStoragePath(item, itemBasePath) - slot := ToSABnzbdHistorySlot(item, start+index, finalPath) + // Combine and convert to SABnzbd format + slots := make([]SABnzbdHistorySlot, 0, len(finalItems)) + var totalBytes int64 + itemBasePath := s.calculateItemBasePath() + + for i, item := range finalItems { + finalPath := s.calculateHistoryStoragePath(item, itemBasePath) + slot := ToSABnzbdHistorySlot(item, start+i, finalPath) slots = append(slots, slot) totalBytes += slot.Bytes - index++ } // Create the proper history response structure using the new struct @@ -794,7 +859,7 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error { WeekSize: "0 B", Version: "4.5.0", DaySize: "0 B", - Noofslots: len(finalItems) + totalFailed, + Noofslots: totalAvailableCount, }, } diff --git a/internal/api/sabnzbd_types.go b/internal/api/sabnzbd_types.go index 924fb271..8ff9548b 100644 --- a/internal/api/sabnzbd_types.go +++ b/internal/api/sabnzbd_types.go @@ -379,9 +379,15 @@ func ToSABnzbdQueueSlot(item *database.ImportQueueItem, index int, progressBroad sizeLeftBytes := int64((100 - progressPercentage) * int(totalSizeBytes) / 100) + // Use DownloadID (GUID) as NzoID for stable tracking + nzoID := fmt.Sprintf("%d", item.ID) + if item.DownloadID != nil && *item.DownloadID != "" { + nzoID = *item.DownloadID + } + return SABnzbdQueueSlot{ Index: index, - NzoID: fmt.Sprintf("%d", item.ID), + NzoID: nzoID, Priority: priority, Filename: jobName, Cat: category, @@ -503,10 +509,16 @@ func ToSABnzbdHistorySlot(item *database.ImportQueueItem, index int, finalPath s } } + // Use DownloadID (GUID) as NzoID for stable tracking + nzoID := fmt.Sprintf("%d", item.ID) + if item.DownloadID != nil && *item.DownloadID != "" { + nzoID = *item.DownloadID + } + return SABnzbdHistorySlot{ Index: index, - NzoID: fmt.Sprintf("%d", item.ID), + NzoID: nzoID, Name: jobName, diff --git a/internal/api/stremio_addon_handlers.go b/internal/api/stremio_addon_handlers.go index ed57f98f..19aa0622 100644 --- a/internal/api/stremio_addon_handlers.go +++ b/internal/api/stremio_addon_handlers.go @@ -395,7 +395,7 @@ func (s *Server) handleStremioAddonPlay(c *fiber.Ctx) error { priority := database.QueuePriorityHigh stremioCategory := "stremio" - item, err := s.importerService.AddToQueue(ctx, tempPath, basePath, &stremioCategory, &priority, nil) + item, err := s.importerService.AddToQueue(ctx, tempPath, basePath, &stremioCategory, &priority, nil, nil) if err != nil { os.Remove(tempPath) slog.ErrorContext(ctx, "Failed to add Prowlarr NZB to queue", "error", err, "title", safeTitle) diff --git a/internal/api/update_handlers.go b/internal/api/update_handlers.go index 663064a2..108b57ca 100644 --- a/internal/api/update_handlers.go +++ b/internal/api/update_handlers.go @@ -206,6 +206,7 @@ func (s *Server) handleApplyUpdate(c *fiber.Ctx) error { // 1. Pull the new image cmd := exec.CommandContext(ctx, "docker", "pull", image) + cmd.Env = append(os.Environ(), "HOME=/config") output, err := cmd.CombinedOutput() if err != nil { slog.ErrorContext(ctx, "Failed to pull latest image", "error", err, "output", string(output)) diff --git a/internal/arrs/scanner/manager.go b/internal/arrs/scanner/manager.go index b45c52cf..69690a06 100644 --- a/internal/arrs/scanner/manager.go +++ b/internal/arrs/scanner/manager.go @@ -9,6 +9,7 @@ import ( "strings" "time" + "golang.org/x/sync/singleflight" "github.com/javi11/altmount/internal/arrs/clients" "github.com/javi11/altmount/internal/arrs/data" "github.com/javi11/altmount/internal/arrs/instances" @@ -24,6 +25,7 @@ type Manager struct { instances *instances.Manager clients *clients.Manager data *data.Manager + sf singleflight.Group } func NewManager(configGetter config.ConfigGetter, instances *instances.Manager, clients *clients.Manager, data *data.Manager) *Manager { @@ -232,44 +234,54 @@ func (m *Manager) sonarrHasFile(ctx context.Context, client *sonarr.Sonarr, inst // TriggerFileRescan triggers a rescan for a specific file path through the appropriate ARR instance func (m *Manager) TriggerFileRescan(ctx context.Context, pathForRescan string, relativePath string) error { - slog.InfoContext(ctx, "Triggering ARR rescan", "path", pathForRescan, "relative_path", relativePath) + res, err, _ := m.sf.Do(fmt.Sprintf("rescan:%s", pathForRescan), func() (interface{}, error) { + slog.InfoContext(ctx, "Triggering ARR rescan", "path", pathForRescan, "relative_path", relativePath) - // Find which ARR instance manages this file path - instanceType, instanceName, err := m.findInstanceForFilePath(ctx, pathForRescan, relativePath) - if err != nil { - return fmt.Errorf("failed to find ARR instance for file path %s: %w", pathForRescan, err) - } - - // Find the instance configuration - instanceConfig, err := m.instances.FindConfigInstance(instanceType, instanceName) - if err != nil { - return fmt.Errorf("failed to find instance config: %w", err) - } - - // Check if instance is enabled - if !instanceConfig.Enabled { - return fmt.Errorf("instance %s/%s is disabled", instanceType, instanceName) - } - - // Trigger rescan based on instance type - switch instanceType { - case "radarr": - client, err := m.clients.GetOrCreateRadarrClient(instanceName, instanceConfig.URL, instanceConfig.APIKey) + // Find which ARR instance manages this file path + instanceType, instanceName, err := m.findInstanceForFilePath(ctx, pathForRescan, relativePath) if err != nil { - return fmt.Errorf("failed to create Radarr client: %w", err) + return nil, fmt.Errorf("failed to find ARR instance for file path %s: %w", pathForRescan, err) } - return m.triggerRadarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName) - case "sonarr": - client, err := m.clients.GetOrCreateSonarrClient(instanceName, instanceConfig.URL, instanceConfig.APIKey) + // Find the instance configuration + instanceConfig, err := m.instances.FindConfigInstance(instanceType, instanceName) if err != nil { - return fmt.Errorf("failed to create Sonarr client: %w", err) + return nil, fmt.Errorf("failed to find instance config: %w", err) } - return m.triggerSonarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName) - default: - return fmt.Errorf("unsupported instance type: %s", instanceType) + // Check if instance is enabled + if !instanceConfig.Enabled { + return nil, fmt.Errorf("instance %s/%s is disabled", instanceType, instanceName) + } + + // Trigger rescan based on instance type + switch instanceType { + case "radarr": + client, err := m.clients.GetOrCreateRadarrClient(instanceName, instanceConfig.URL, instanceConfig.APIKey) + if err != nil { + return nil, fmt.Errorf("failed to create Radarr client: %w", err) + } + return nil, m.triggerRadarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName) + + case "sonarr": + client, err := m.clients.GetOrCreateSonarrClient(instanceName, instanceConfig.URL, instanceConfig.APIKey) + if err != nil { + return nil, fmt.Errorf("failed to create Sonarr client: %w", err) + } + return nil, m.triggerSonarrRescanByPath(ctx, client, pathForRescan, relativePath, instanceName) + + default: + return nil, fmt.Errorf("unsupported instance type: %s", instanceType) + } + }) + + if err != nil { + return err } + if res != nil { + return res.(error) + } + return nil } // TriggerScanForFile finds the ARR instance managing the file and triggers a download scan on it. @@ -341,38 +353,41 @@ func (m *Manager) TriggerDownloadScan(ctx context.Context, instanceType string) slog.DebugContext(ctx, "Triggering download client scan", "instance", instance.Name, "type", instance.Type) go func(inst *model.ConfigInstance) { - bgCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - switch inst.Type { - case "radarr": - client, err := m.clients.GetOrCreateRadarrClient(inst.Name, inst.URL, inst.APIKey) - if err != nil { - slog.ErrorContext(bgCtx, "Failed to create Radarr client for scan trigger", "instance", inst.Name, "error", err) - return - } - // Trigger RefreshMonitoredDownloads - _, err = client.SendCommandContext(bgCtx, &radarr.CommandRequest{Name: "RefreshMonitoredDownloads"}) - if err != nil { - slog.ErrorContext(bgCtx, "Failed to trigger RefreshMonitoredDownloads", "instance", inst.Name, "error", err) - } else { - slog.InfoContext(bgCtx, "Triggered RefreshMonitoredDownloads", "instance", inst.Name) - } + _, _, _ = m.sf.Do(fmt.Sprintf("scan:%s", inst.Name), func() (interface{}, error) { + bgCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + switch inst.Type { + case "radarr": + client, err := m.clients.GetOrCreateRadarrClient(inst.Name, inst.URL, inst.APIKey) + if err != nil { + slog.ErrorContext(bgCtx, "Failed to create Radarr client for scan trigger", "instance", inst.Name, "error", err) + return nil, err + } + // Trigger RefreshMonitoredDownloads + _, err = client.SendCommandContext(bgCtx, &radarr.CommandRequest{Name: "RefreshMonitoredDownloads"}) + if err != nil { + slog.ErrorContext(bgCtx, "Failed to trigger RefreshMonitoredDownloads", "instance", inst.Name, "error", err) + } else { + slog.InfoContext(bgCtx, "Triggered RefreshMonitoredDownloads", "instance", inst.Name) + } - case "sonarr": - client, err := m.clients.GetOrCreateSonarrClient(inst.Name, inst.URL, inst.APIKey) - if err != nil { - slog.ErrorContext(bgCtx, "Failed to create Sonarr client for scan trigger", "instance", inst.Name, "error", err) - return - } - // Trigger RefreshMonitoredDownloads - _, err = client.SendCommandContext(bgCtx, &sonarr.CommandRequest{Name: "RefreshMonitoredDownloads"}) - if err != nil { - slog.ErrorContext(bgCtx, "Failed to trigger RefreshMonitoredDownloads", "instance", inst.Name, "error", err) - } else { - slog.InfoContext(bgCtx, "Triggered RefreshMonitoredDownloads", "instance", inst.Name) + case "sonarr": + client, err := m.clients.GetOrCreateSonarrClient(inst.Name, inst.URL, inst.APIKey) + if err != nil { + slog.ErrorContext(bgCtx, "Failed to create Sonarr client for scan trigger", "instance", inst.Name, "error", err) + return nil, err + } + // Trigger RefreshMonitoredDownloads + _, err = client.SendCommandContext(bgCtx, &sonarr.CommandRequest{Name: "RefreshMonitoredDownloads"}) + if err != nil { + slog.ErrorContext(bgCtx, "Failed to trigger RefreshMonitoredDownloads", "instance", inst.Name, "error", err) + } else { + slog.InfoContext(bgCtx, "Triggered RefreshMonitoredDownloads", "instance", inst.Name) + } } - } + return nil, nil + }) }(instance) } } diff --git a/internal/database/migrations/postgres/019_add_download_id.sql b/internal/database/migrations/postgres/019_add_download_id.sql new file mode 100644 index 00000000..e82b79ce --- /dev/null +++ b/internal/database/migrations/postgres/019_add_download_id.sql @@ -0,0 +1,12 @@ +-- +goose Up +ALTER TABLE import_queue ADD COLUMN download_id TEXT; +ALTER TABLE import_history ADD COLUMN download_id TEXT; + +CREATE INDEX idx_queue_download_id ON import_queue(download_id); +CREATE INDEX idx_history_download_id ON import_history(download_id); + +-- +goose Down +DROP INDEX IF EXISTS idx_history_download_id; +DROP INDEX IF EXISTS idx_queue_download_id; +ALTER TABLE import_history DROP COLUMN IF EXISTS download_id; +ALTER TABLE import_queue DROP COLUMN IF EXISTS download_id; diff --git a/internal/database/migrations/sqlite/019_add_download_id.sql b/internal/database/migrations/sqlite/019_add_download_id.sql new file mode 100644 index 00000000..4e9aafc1 --- /dev/null +++ b/internal/database/migrations/sqlite/019_add_download_id.sql @@ -0,0 +1,10 @@ +-- +goose Up +ALTER TABLE import_queue ADD COLUMN download_id TEXT DEFAULT NULL; +ALTER TABLE import_history ADD COLUMN download_id TEXT DEFAULT NULL; + +CREATE INDEX idx_queue_download_id ON import_queue(download_id); +CREATE INDEX idx_history_download_id ON import_history(download_id); + +-- +goose Down +DROP INDEX IF EXISTS idx_history_download_id; +DROP INDEX IF EXISTS idx_queue_download_id; diff --git a/internal/database/models.go b/internal/database/models.go index c92db533..b1672778 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -28,6 +28,7 @@ const ( // ImportQueueItem represents a queued NZB file waiting for import type ImportQueueItem struct { ID int64 `db:"id"` + DownloadID *string `db:"download_id"` // GUID/String ID for external tracking (e.g. Sonarr/Radarr) NzbPath string `db:"nzb_path"` RelativePath *string `db:"relative_path"` StoragePath *string `db:"storage_path"` @@ -168,6 +169,7 @@ type ImportHourlyStat struct { // ImportHistory represents a persistent record of a single imported file type ImportHistory struct { ID int64 `db:"id"` + DownloadID *string `db:"download_id"` NzbID *int64 `db:"nzb_id"` // Nullable if queue item deleted NzbName string `db:"nzb_name"` FileName string `db:"file_name"` diff --git a/internal/database/queue_repository.go b/internal/database/queue_repository.go index e27ee216..711cc535 100644 --- a/internal/database/queue_repository.go +++ b/internal/database/queue_repository.go @@ -109,9 +109,10 @@ func (r *QueueRepository) RestartQueueItemsBulk(ctx context.Context, ids []int64 // AddToQueue adds a new NZB file to the import queue func (r *QueueRepository) AddToQueue(ctx context.Context, item *ImportQueueItem) error { query := ` - INSERT INTO import_queue (nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) + INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) ON CONFLICT(nzb_path) DO UPDATE SET + download_id = COALESCE(excluded.download_id, import_queue.download_id), priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END, category = excluded.category, batch_id = excluded.batch_id, @@ -126,7 +127,7 @@ func (r *QueueRepository) AddToQueue(ctx context.Context, item *ImportQueueItem) WHERE status NOT IN ('processing', 'pending') ` - args := []any{item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, + args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.TargetPath} if r.dialect.IsPostgres() { @@ -229,7 +230,7 @@ func (r *QueueRepository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueI // Get the complete claimed item data getQuery := ` - SELECT id, nzb_path, relative_path, category, priority, status, created_at, updated_at, + SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, target_path FROM import_queue WHERE id = ? @@ -237,7 +238,7 @@ func (r *QueueRepository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueI var item ImportQueueItem err = txRepo.db.QueryRowContext(ctx, getQuery, itemID).Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.TargetPath, ) @@ -409,11 +410,11 @@ func (r *QueueRepository) GetImportHistory(ctx context.Context, days int) ([]*Im // AddImportHistory records a successful file import in the persistent history table func (r *QueueRepository) AddImportHistory(ctx context.Context, history *ImportHistory) error { query := ` - INSERT INTO import_history (nzb_id, nzb_name, file_name, file_size, virtual_path, category, metadata, completed_at) - VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now')) + INSERT INTO import_history (download_id, nzb_id, nzb_name, file_name, file_size, virtual_path, category, metadata, completed_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, datetime('now')) ` _, err := r.db.ExecContext(ctx, query, - history.NzbID, history.NzbName, history.FileName, history.FileSize, + history.DownloadID, history.NzbID, history.NzbName, history.FileName, history.FileSize, history.VirtualPath, history.Category, history.Metadata) if err != nil { return fmt.Errorf("failed to add import history: %w", err) @@ -424,7 +425,7 @@ func (r *QueueRepository) AddImportHistory(ctx context.Context, history *ImportH // ListImportHistory retrieves the last N successful imports from the persistent history func (r *QueueRepository) ListImportHistory(ctx context.Context, limit int) ([]*ImportHistory, error) { query := ` - SELECT h.id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.metadata, h.completed_at + SELECT h.id, h.download_id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.metadata, h.completed_at FROM import_history h LEFT JOIN file_health f ON h.virtual_path = f.file_path ORDER BY h.completed_at DESC @@ -439,7 +440,7 @@ func (r *QueueRepository) ListImportHistory(ctx context.Context, limit int) ([]* var history []*ImportHistory for rows.Next() { var h ImportHistory - err := rows.Scan(&h.ID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.Metadata, &h.CompletedAt) + err := rows.Scan(&h.ID, &h.DownloadID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.Metadata, &h.CompletedAt) if err != nil { return nil, fmt.Errorf("failed to scan import history: %w", err) } @@ -608,9 +609,10 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu return r.withQueueTransaction(ctx, func(txRepo *QueueRepository) error { // Prepare batch insert statement query := ` - INSERT INTO import_queue (nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) + INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) ON CONFLICT(nzb_path) DO UPDATE SET + download_id = COALESCE(excluded.download_id, import_queue.download_id), priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END, category = excluded.category, batch_id = excluded.batch_id, @@ -622,7 +624,7 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu now := time.Now() for _, item := range items { - args := []any{item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, + args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize} if txRepo.dialect.IsPostgres() { @@ -651,14 +653,14 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu // GetQueueItem retrieves a specific queue item by ID func (r *QueueRepository) GetQueueItem(ctx context.Context, id int64) (*ImportQueueItem, error) { query := ` - SELECT id, nzb_path, relative_path, category, priority, status, created_at, updated_at, + SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path FROM import_queue WHERE id = ? ` var item ImportQueueItem err := r.db.QueryRowContext(ctx, query, id).Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, ) @@ -709,7 +711,7 @@ func (r *QueueRepository) DeleteFailedItemsOlderThan(ctx context.Context, olderT err := r.withQueueTransaction(ctx, func(txRepo *QueueRepository) error { // Select failed items older than the threshold - selectQuery := `SELECT id, nzb_path, relative_path, category, priority, status, created_at, updated_at, + selectQuery := `SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path FROM import_queue WHERE status = 'failed' AND updated_at < ?` @@ -722,7 +724,7 @@ func (r *QueueRepository) DeleteFailedItemsOlderThan(ctx context.Context, olderT for rows.Next() { var item ImportQueueItem if err := rows.Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, ); err != nil { diff --git a/internal/database/repository.go b/internal/database/repository.go index 33b606a0..98d17667 100644 --- a/internal/database/repository.go +++ b/internal/database/repository.go @@ -103,9 +103,10 @@ func (r *Repository) withTransactionMode(ctx context.Context, mode string, fn fu func (r *Repository) AddToQueue(ctx context.Context, item *ImportQueueItem) error { // Use UPSERT with immediate lock to prevent conflicts during concurrent inserts query := ` - INSERT INTO import_queue (nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) + INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) ON CONFLICT(nzb_path) DO UPDATE SET + download_id = COALESCE(excluded.download_id, import_queue.download_id), priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END, category = excluded.category, batch_id = excluded.batch_id, @@ -116,7 +117,7 @@ func (r *Repository) AddToQueue(ctx context.Context, item *ImportQueueItem) erro WHERE status NOT IN ('processing', 'completed') ` - args := []any{item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, + args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.TargetPath} if r.dialect.IsPostgres() { @@ -146,7 +147,7 @@ func (r *Repository) GetNextQueueItems(ctx context.Context, limit int) ([]*Impor // Use a CTE to select items and immediately mark them as claimed to avoid race conditions query := fmt.Sprintf(` WITH selected_items AS ( - SELECT id, nzb_path, relative_path, category, priority, status, created_at, updated_at, + SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, target_path FROM import_queue WHERE status = 'pending' @@ -167,7 +168,7 @@ func (r *Repository) GetNextQueueItems(ctx context.Context, limit int) ([]*Impor for rows.Next() { var item ImportQueueItem err := rows.Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.TargetPath, ) @@ -202,14 +203,14 @@ func (r *Repository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueItem, ORDER BY priority ASC, created_at ASC LIMIT 1 ) AND status = 'pending' - RETURNING id, nzb_path, relative_path, category, priority, status, + RETURNING id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, target_path `, r.dialect.ColumnPlusMinutes("started_at", 10)) var item ImportQueueItem err := txRepo.db.QueryRowContext(ctx, updateQuery).Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, @@ -244,9 +245,10 @@ func (r *Repository) AddBatchToQueue(ctx context.Context, items []*ImportQueueIt return r.WithImmediateTransaction(ctx, func(txRepo *Repository) error { // Prepare batch insert statement query := ` - INSERT INTO import_queue (nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) + INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) ON CONFLICT(nzb_path) DO UPDATE SET + download_id = COALESCE(excluded.download_id, import_queue.download_id), priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END, category = excluded.category, batch_id = excluded.batch_id, @@ -259,7 +261,7 @@ func (r *Repository) AddBatchToQueue(ctx context.Context, items []*ImportQueueIt now := time.Now() for _, item := range items { - args := []any{item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, + args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.TargetPath} if txRepo.dialect.IsPostgres() { @@ -340,14 +342,14 @@ func (r *Repository) IncrementDailyStat(ctx context.Context, statType string) er // GetQueueItem retrieves a specific queue item by ID func (r *Repository) GetQueueItem(ctx context.Context, id int64) (*ImportQueueItem, error) { query := ` - SELECT id, nzb_path, relative_path, category, priority, status, created_at, updated_at, + SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path FROM import_queue WHERE id = ? ` var item ImportQueueItem err := r.db.QueryRowContext(ctx, query, id).Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, ) @@ -362,17 +364,17 @@ func (r *Repository) GetQueueItem(ctx context.Context, id int64) (*ImportQueueIt return &item, nil } -// GetQueueItemByPath retrieves a queue item by NZB path -func (r *Repository) GetQueueItemByPath(ctx context.Context, nzbPath string) (*ImportQueueItem, error) { +// GetQueueItemByDownloadID retrieves a queue item by its DownloadID +func (r *Repository) GetQueueItemByDownloadID(ctx context.Context, downloadID string) (*ImportQueueItem, error) { query := ` - SELECT id, nzb_path, relative_path, category, priority, status, created_at, updated_at, + SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path - FROM import_queue WHERE nzb_path = ? + FROM import_queue WHERE download_id = ? ` var item ImportQueueItem - err := r.db.QueryRowContext(ctx, query, nzbPath).Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, + err := r.db.QueryRowContext(ctx, query, downloadID).Scan( + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, ) @@ -381,12 +383,33 @@ func (r *Repository) GetQueueItemByPath(ctx context.Context, nzbPath string) (*I if err == sql.ErrNoRows { return nil, nil } - return nil, fmt.Errorf("failed to get queue item by path: %w", err) + return nil, fmt.Errorf("failed to get queue item by download_id: %w", err) } return &item, nil } +// RemoveFromQueueByDownloadID removes an item from the queue by its DownloadID +func (r *Repository) RemoveFromQueueByDownloadID(ctx context.Context, downloadID string) error { + query := `DELETE FROM import_queue WHERE download_id = ?` + + result, err := r.db.ExecContext(ctx, query, downloadID) + if err != nil { + return fmt.Errorf("failed to remove from queue by download_id: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected == 0 { + return sql.ErrNoRows + } + + return nil +} + // RemoveFromQueue removes an item from the queue func (r *Repository) RemoveFromQueue(ctx context.Context, id int64) error { query := `DELETE FROM import_queue WHERE id = ?` @@ -408,6 +431,22 @@ func (r *Repository) RemoveFromQueue(ctx context.Context, id int64) error { return nil } +// RemoveFromHistoryByDownloadID removes a record from import_history by its DownloadID +func (r *Repository) RemoveFromHistoryByDownloadID(ctx context.Context, downloadID string) (int64, error) { + query := `DELETE FROM import_history WHERE download_id = ?` + result, err := r.db.ExecContext(ctx, query, downloadID) + if err != nil { + return 0, fmt.Errorf("failed to remove history record by download_id: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("failed to get rows affected: %w", err) + } + + return rowsAffected, nil +} + // RemoveFromHistory removes a record from import_history by its own ID func (r *Repository) RemoveFromHistory(ctx context.Context, id int64) (int64, error) { query := `DELETE FROM import_history WHERE id = ?` @@ -649,7 +688,7 @@ func (r *Repository) ListQueueItems(ctx context.Context, status *QueueStatus, se var query string var args []any - baseSelect := `SELECT id, nzb_path, relative_path, category, priority, status, created_at, updated_at, + baseSelect := `SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path FROM import_queue` @@ -711,7 +750,7 @@ func (r *Repository) ListQueueItems(ctx context.Context, status *QueueStatus, se for rows.Next() { var item ImportQueueItem err := rows.Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, ) @@ -729,7 +768,7 @@ func (r *Repository) ListActiveQueueItems(ctx context.Context, search string, ca var query string var args []any - baseSelect := `SELECT id, nzb_path, relative_path, category, priority, status, created_at, updated_at, + baseSelect := `SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path FROM import_queue` @@ -782,7 +821,7 @@ func (r *Repository) ListActiveQueueItems(ctx context.Context, search string, ca for rows.Next() { var item ImportQueueItem err := rows.Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, ) @@ -1018,11 +1057,11 @@ func (r *Repository) UpdateQueueItemPriority(ctx context.Context, id int64, prio // AddImportHistory records a successful file import in the persistent history table func (r *Repository) AddImportHistory(ctx context.Context, history *ImportHistory) error { query := ` - INSERT INTO import_history (nzb_id, nzb_name, file_name, file_size, virtual_path, category, completed_at) - VALUES (?, ?, ?, ?, ?, ?, datetime('now')) + INSERT INTO import_history (download_id, nzb_id, nzb_name, file_name, file_size, virtual_path, category, completed_at) + VALUES (?, ?, ?, ?, ?, ?, ?, datetime('now')) ` _, err := r.db.ExecContext(ctx, query, - history.NzbID, history.NzbName, history.FileName, history.FileSize, + history.DownloadID, history.NzbID, history.NzbName, history.FileName, history.FileSize, history.VirtualPath, history.Category) if err != nil { return fmt.Errorf("failed to add import history: %w", err) @@ -1030,10 +1069,32 @@ func (r *Repository) AddImportHistory(ctx context.Context, history *ImportHistor return nil } +// GetImportHistoryByDownloadID retrieves an import history item by its DownloadID +func (r *Repository) GetImportHistoryByDownloadID(ctx context.Context, downloadID string) (*ImportHistory, error) { + query := ` + SELECT h.id, h.download_id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.completed_at + FROM import_history h + LEFT JOIN file_health f ON TRIM(h.virtual_path, '/') = TRIM(f.file_path, '/') + WHERE h.download_id = ? + LIMIT 1 + ` + + var h ImportHistory + err := r.db.QueryRowContext(ctx, query, downloadID).Scan(&h.ID, &h.DownloadID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.CompletedAt) + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, fmt.Errorf("failed to get import history by download_id: %w", err) + } + + return &h, nil +} + // GetImportHistoryByPath retrieves an import history item by its virtual path func (r *Repository) GetImportHistoryByPath(ctx context.Context, virtualPath string) (*ImportHistory, error) { query := ` - SELECT h.id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.completed_at + SELECT h.id, h.download_id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.completed_at FROM import_history h LEFT JOIN file_health f ON TRIM(h.virtual_path, '/') = TRIM(f.file_path, '/') WHERE TRIM(h.virtual_path, '/') = TRIM(?, '/') @@ -1041,7 +1102,7 @@ func (r *Repository) GetImportHistoryByPath(ctx context.Context, virtualPath str ` var h ImportHistory - err := r.db.QueryRowContext(ctx, query, virtualPath).Scan(&h.ID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.CompletedAt) + err := r.db.QueryRowContext(ctx, query, virtualPath).Scan(&h.ID, &h.DownloadID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.CompletedAt) if err != nil { if err == sql.ErrNoRows { return nil, nil @@ -1055,7 +1116,7 @@ func (r *Repository) GetImportHistoryByPath(ctx context.Context, virtualPath str // ListImportHistory retrieves import history items with optional filtering and pagination func (r *Repository) ListImportHistory(ctx context.Context, limit, offset int, search string, category string) ([]*ImportHistory, error) { query := ` - SELECT h.id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.completed_at + SELECT h.id, h.download_id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.completed_at FROM import_history h LEFT JOIN file_health f ON h.virtual_path = f.file_path WHERE (? = '' OR h.nzb_name LIKE ? OR h.file_name LIKE ? OR h.virtual_path LIKE ?) @@ -1074,7 +1135,7 @@ func (r *Repository) ListImportHistory(ctx context.Context, limit, offset int, s var history []*ImportHistory for rows.Next() { var h ImportHistory - err := rows.Scan(&h.ID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.CompletedAt) + err := rows.Scan(&h.ID, &h.DownloadID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.CompletedAt) if err != nil { return nil, fmt.Errorf("failed to scan import history: %w", err) } @@ -1094,7 +1155,7 @@ func (r *Repository) ListRecentImportHistory(ctx context.Context, minutes int, c } query := fmt.Sprintf(` - SELECT h.id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.completed_at + SELECT h.id, h.download_id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.completed_at FROM import_history h LEFT JOIN file_health f ON TRIM(h.virtual_path, '/') = TRIM(f.file_path, '/') WHERE h.completed_at >= %s @@ -1111,7 +1172,7 @@ func (r *Repository) ListRecentImportHistory(ctx context.Context, minutes int, c var history []*ImportHistory for rows.Next() { var h ImportHistory - err := rows.Scan(&h.ID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.CompletedAt) + err := rows.Scan(&h.ID, &h.DownloadID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.CompletedAt) if err != nil { return nil, fmt.Errorf("failed to scan import history: %w", err) } @@ -1251,14 +1312,14 @@ func (r *Repository) GetImportHistory(ctx context.Context, days int) ([]*ImportD // GetImportHistoryItem retrieves a specific import history item by ID func (r *Repository) GetImportHistoryItem(ctx context.Context, id int64) (*ImportHistory, error) { query := ` - SELECT h.id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.completed_at + SELECT h.id, h.download_id, h.nzb_id, h.nzb_name, h.file_name, h.file_size, h.virtual_path, f.library_path, h.category, h.completed_at FROM import_history h LEFT JOIN file_health f ON h.virtual_path = f.file_path WHERE h.id = ? ` var h ImportHistory - err := r.db.QueryRowContext(ctx, query, id).Scan(&h.ID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.CompletedAt) + err := r.db.QueryRowContext(ctx, query, id).Scan(&h.ID, &h.DownloadID, &h.NzbID, &h.NzbName, &h.FileName, &h.FileSize, &h.VirtualPath, &h.LibraryPath, &h.Category, &h.CompletedAt) if err != nil { if err == sql.ErrNoRows { return nil, nil @@ -1544,7 +1605,7 @@ func (r *Repository) GetExpiredStremioQueueItems(ctx context.Context, ttlHours i } query := fmt.Sprintf(` - SELECT id, nzb_path, relative_path, category, priority, status, created_at, updated_at, + SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path FROM import_queue WHERE status = 'completed' @@ -1564,7 +1625,7 @@ func (r *Repository) GetExpiredStremioQueueItems(ctx context.Context, ttlHours i for rows.Next() { var item ImportQueueItem err := rows.Scan( - &item.ID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, + &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, ) diff --git a/internal/database/testing.go b/internal/database/testing.go index 137a45a3..3486db95 100644 --- a/internal/database/testing.go +++ b/internal/database/testing.go @@ -14,6 +14,7 @@ func setupQueueSchema(t *testing.T, db *sql.DB) { schema := ` CREATE TABLE import_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, + download_id TEXT DEFAULT NULL, nzb_path TEXT NOT NULL, relative_path TEXT DEFAULT NULL, storage_path TEXT DEFAULT NULL, @@ -34,6 +35,7 @@ func setupQueueSchema(t *testing.T, db *sql.DB) { UNIQUE(nzb_path) ); + CREATE INDEX idx_queue_download_id ON import_queue(download_id); CREATE INDEX idx_queue_status_priority ON import_queue(status, priority, created_at); CREATE INDEX idx_queue_batch_id ON import_queue(batch_id); CREATE INDEX idx_queue_status ON import_queue(status); diff --git a/internal/health/library_sync.go b/internal/health/library_sync.go index 6126a39a..00bc9cef 100644 --- a/internal/health/library_sync.go +++ b/internal/health/library_sync.go @@ -701,16 +701,27 @@ func (lsw *LibrarySyncWorker) SyncLibrary(ctx context.Context, dryRun bool) *Dry // Use the configured mount path to build an absolute expected path expectedPath := pathutil.JoinAbsPath(cfg.MountPath, path) if _, err := os.Stat(expectedPath); err == nil { - // Found it! Use this recovered path - libStr := expectedPath - libraryPath = &libStr - slog.InfoContext(ctx, "Recovered broken library path", - "path", path, "new_location", expectedPath) - - // Update DB immediately for this recovered path - if err := lsw.healthRepo.UpdateLibraryPath(ctx, path, expectedPath); err != nil { - slog.ErrorContext(ctx, "Failed to update recovered library path", - "path", path, "error", err) + // Found it! Use this recovered path ONLY if it is absolute and NOT equal to the local mount path + // (since repairs MUST use the library path Sonarr/Radarr expects). + if filepath.IsAbs(expectedPath) && (lp == nil || *lp != expectedPath) { + // Check if the expected path is actually a library path (must contain libraryDir or be different from mountPath) + isLibraryPath := false + if cfg.Health.LibraryDir != nil && strings.HasPrefix(expectedPath, *cfg.Health.LibraryDir) { + isLibraryPath = true + } + + if isLibraryPath { + libStr := expectedPath + libraryPath = &libStr + slog.InfoContext(ctx, "Recovered broken library path", + "path", path, "new_location", expectedPath) + + // Update DB immediately for this recovered path + if err := lsw.healthRepo.UpdateLibraryPath(ctx, path, expectedPath); err != nil { + slog.ErrorContext(ctx, "Failed to update recovered library path", + "path", path, "error", err) + } + } } } } diff --git a/internal/importer/interfaces.go b/internal/importer/interfaces.go index 797ddc82..a59cefb8 100644 --- a/internal/importer/interfaces.go +++ b/internal/importer/interfaces.go @@ -49,7 +49,7 @@ type NzbDavImporter interface { // QueueOperations provides queue manipulation operations type QueueOperations interface { // AddToQueue adds an item to the import queue - AddToQueue(ctx context.Context, filePath string, relativePath *string, category *string, priority *database.QueuePriority, metadata *string) (*database.ImportQueueItem, error) + AddToQueue(ctx context.Context, filePath string, relativePath *string, category *string, priority *database.QueuePriority, metadata *string, downloadID *string) (*database.ImportQueueItem, error) // GetQueueStats returns queue statistics GetQueueStats(ctx context.Context) (*database.QueueStats, error) } diff --git a/internal/importer/scanner/watcher.go b/internal/importer/scanner/watcher.go index 66dabcd6..06810c15 100644 --- a/internal/importer/scanner/watcher.go +++ b/internal/importer/scanner/watcher.go @@ -13,12 +13,13 @@ import ( "github.com/javi11/altmount/internal/database" ) -// WatchQueueAdder defines the interface for adding items to the queue with category support +// WatchQueueAdder interface for adding items to the import queue from directory watcher type WatchQueueAdder interface { - AddToQueue(ctx context.Context, filePath string, relativePath *string, category *string, priority *database.QueuePriority, metadata *string) (*database.ImportQueueItem, error) + AddToQueue(ctx context.Context, filePath string, relativePath *string, category *string, priority *database.QueuePriority, metadata *string, downloadID *string) (*database.ImportQueueItem, error) IsFileInQueue(ctx context.Context, filePath string) (bool, error) } + // Watcher handles monitoring a directory for new NZB files type Watcher struct { queueAdder WatchQueueAdder @@ -289,7 +290,7 @@ func (w *Watcher) processNzb(ctx context.Context, watchRoot, filePath string) er // Add to queue priority := database.QueuePriorityNormal - item, err := w.queueAdder.AddToQueue(ctx, filePath, relativePath, category, &priority, nil) + item, err := w.queueAdder.AddToQueue(ctx, filePath, relativePath, category, &priority, nil, nil) if err != nil { return fmt.Errorf("failed to add to queue: %w", err) } diff --git a/internal/importer/service.go b/internal/importer/service.go index c9b12080..ef5ed469 100644 --- a/internal/importer/service.go +++ b/internal/importer/service.go @@ -69,6 +69,7 @@ func (a *queueAdapterForScanner) AddToQueue(ctx context.Context, filePath string } item := &database.ImportQueueItem{ + DownloadID: nil, // Generated later in service if needed NzbPath: filePath, RelativePath: relativePath, Priority: database.QueuePriorityNormal, @@ -614,7 +615,7 @@ func sanitizeFilename(name string) string { } // AddToQueue adds a new NZB file to the import queue with optional category and priority -func (s *Service) AddToQueue(ctx context.Context, filePath string, relativePath *string, category *string, priority *database.QueuePriority, metadata *string) (*database.ImportQueueItem, error) { +func (s *Service) AddToQueue(ctx context.Context, filePath string, relativePath *string, category *string, priority *database.QueuePriority, metadata *string, downloadID *string) (*database.ImportQueueItem, error) { // Check context before proceeding select { case <-ctx.Done(): @@ -639,6 +640,7 @@ func (s *Service) AddToQueue(ctx context.Context, filePath string, relativePath } item := &database.ImportQueueItem{ + DownloadID: downloadID, NzbPath: filePath, RelativePath: relativePath, Category: category,