diff --git a/internal/api/import_handlers.go b/internal/api/import_handlers.go index 01511d89c..c9a5df3a6 100644 --- a/internal/api/import_handlers.go +++ b/internal/api/import_handlers.go @@ -264,14 +264,15 @@ func (s *Server) handleManualImportFile(c *fiber.Ctx) error { // Add the file to the processing queue item := &database.ImportQueueItem{ - NzbPath: req.FilePath, - Priority: database.QueuePriorityNormal, - Status: database.QueueStatusPending, - RetryCount: 0, - MaxRetries: 3, - CreatedAt: time.Now(), - RelativePath: req.RelativePath, - TargetPath: targetPath, + NzbPath: req.FilePath, + Priority: database.QueuePriorityNormal, + Status: database.QueueStatusPending, + RetryCount: 0, + MaxRetries: 3, + CreatedAt: time.Now(), + RelativePath: req.RelativePath, + TargetPath: targetPath, + SkipArrNotification: req.SkipArrNotification, } slog.DebugContext(c.Context(), "Adding file to queue", "file", req.FilePath, "relative_path", req.RelativePath, "target_path", targetPath) diff --git a/internal/api/import_handlers_skip_arr_test.go b/internal/api/import_handlers_skip_arr_test.go new file mode 100644 index 000000000..5f8c275f2 --- /dev/null +++ b/internal/api/import_handlers_skip_arr_test.go @@ -0,0 +1,17 @@ +package api + +import "testing" + +func TestManualImportRequest_SkipArrNotification_True(t *testing.T) { + req := ManualImportRequest{SkipArrNotification: true} + if !req.SkipArrNotification { + t.Error("expected SkipArrNotification to be true") + } +} + +func TestManualImportRequest_SkipArrNotification_FalseByDefault(t *testing.T) { + req := ManualImportRequest{} + if req.SkipArrNotification { + t.Error("expected SkipArrNotification to be false by default") + } +} diff --git a/internal/api/sabnzbd_handlers.go b/internal/api/sabnzbd_handlers.go index ba6cf6713..c92afd1cc 100644 --- a/internal/api/sabnzbd_handlers.go +++ b/internal/api/sabnzbd_handlers.go @@ -606,7 +606,7 @@ func (s *Server) handleSABnzbdQueue(c *fiber.Ctx) error { var totalMbLeft float64 for i, item := range items { - if item.Status == database.QueueStatusFallback { + if item.Status == database.QueueStatusFallback || item.SkipArrNotification { continue } @@ -762,6 +762,9 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error { finalItems := make([]*database.ImportQueueItem, 0) for _, item := range completedQueueItems { + if item.SkipArrNotification { + continue + } name := filepath.Base(item.NzbPath) // Filter by nzo_ids if requested (check both integer ID and DownloadID) if len(nzoIDs) > 0 { @@ -821,6 +824,9 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error { // Combine failed items for noofslots calculation for _, item := range failed { + if item.SkipArrNotification { + continue + } name := filepath.Base(item.NzbPath) // Filter by nzo_ids if requested if len(nzoIDs) > 0 { diff --git a/internal/api/types.go b/internal/api/types.go index 705fb779a..3aace5c07 100644 --- a/internal/api/types.go +++ b/internal/api/types.go @@ -899,8 +899,9 @@ type ScanStatusResponse struct { // ManualImportRequest represents a request to manually import a file by path type ManualImportRequest struct { - FilePath string `json:"file_path"` - RelativePath *string `json:"relative_path,omitempty"` + FilePath string `json:"file_path"` + RelativePath *string `json:"relative_path,omitempty"` + SkipArrNotification bool `json:"skip_arr_notification,omitempty"` } // ManualImportResponse represents the response from manually importing a file diff --git a/internal/database/migrations/postgres/023_add_skip_arr_notification.sql b/internal/database/migrations/postgres/023_add_skip_arr_notification.sql new file mode 100644 index 000000000..90b86f3e7 --- /dev/null +++ b/internal/database/migrations/postgres/023_add_skip_arr_notification.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE import_queue ADD COLUMN skip_arr_notification BOOLEAN NOT NULL DEFAULT FALSE; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE import_queue DROP COLUMN IF EXISTS skip_arr_notification; +-- +goose StatementEnd diff --git a/internal/database/migrations/sqlite/023_add_skip_arr_notification.sql b/internal/database/migrations/sqlite/023_add_skip_arr_notification.sql new file mode 100644 index 000000000..8defadd66 --- /dev/null +++ b/internal/database/migrations/sqlite/023_add_skip_arr_notification.sql @@ -0,0 +1,9 @@ +-- +goose Up +-- +goose StatementBegin +ALTER TABLE import_queue ADD COLUMN skip_arr_notification BOOLEAN NOT NULL DEFAULT FALSE; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +-- SQLite does not support DROP COLUMN in older versions; intentional no-op +-- +goose StatementEnd diff --git a/internal/database/models.go b/internal/database/models.go index be5e56ba9..3171fc44e 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -44,8 +44,9 @@ type ImportQueueItem struct { ErrorMessage *string `db:"error_message"` BatchID *string `db:"batch_id"` Metadata *string `db:"metadata"` // JSON metadata - FileSize *int64 `db:"file_size"` // Total size in bytes calculated from segments - TargetPath *string `db:"target_path"` // Optional forced symlink destination path + FileSize *int64 `db:"file_size"` // Total size in bytes calculated from segments + TargetPath *string `db:"target_path"` // Optional forced symlink destination path + SkipArrNotification bool `db:"skip_arr_notification"` } // BulkOperationResult represents the result of a bulk queue operation @@ -167,3 +168,4 @@ type ImportHistory struct { Metadata *string `db:"metadata"` CompletedAt time.Time `db:"completed_at"` } + diff --git a/internal/database/queue_repository.go b/internal/database/queue_repository.go index 3172377fd..f827ce581 100644 --- a/internal/database/queue_repository.go +++ b/internal/database/queue_repository.go @@ -110,8 +110,8 @@ func (r *QueueRepository) RestartQueueItemsBulk(ctx context.Context, ids []int64 // AddToQueue adds a new NZB file to the import queue func (r *QueueRepository) AddToQueue(ctx context.Context, item *ImportQueueItem) error { query := ` - INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) + INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, skip_arr_notification, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) ON CONFLICT(nzb_path) DO UPDATE SET download_id = COALESCE(excluded.download_id, import_queue.download_id), priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END, @@ -129,7 +129,7 @@ func (r *QueueRepository) AddToQueue(ctx context.Context, item *ImportQueueItem) ` args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, - item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.TargetPath} + item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.TargetPath, item.SkipArrNotification} if r.dialect.IsPostgres() { err := r.db.QueryRowContext(ctx, query+" RETURNING id", args...).Scan(&item.ID) @@ -232,7 +232,7 @@ func (r *QueueRepository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueI // Get the complete claimed item data getQuery := ` SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, - started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, target_path + started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, target_path, skip_arr_notification FROM import_queue WHERE id = ? ` @@ -241,7 +241,7 @@ func (r *QueueRepository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueI err = txRepo.db.QueryRowContext(ctx, getQuery, itemID).Scan( &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, - &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.TargetPath, + &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.TargetPath, &item.SkipArrNotification, ) if err != nil { return fmt.Errorf("failed to get claimed item: %w", err) @@ -546,7 +546,7 @@ func (r *QueueRepository) UpdateQueueItemNzbPath(ctx context.Context, id int64, func (r *QueueRepository) GetQueueItemByNzbPath(ctx context.Context, nzbPath string) (*ImportQueueItem, error) { query := ` SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, - started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path + started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification FROM import_queue WHERE nzb_path = ? LIMIT 1 ` @@ -554,7 +554,7 @@ func (r *QueueRepository) GetQueueItemByNzbPath(ctx context.Context, nzbPath str err := r.db.QueryRowContext(ctx, query, nzbPath).Scan( &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, - &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, + &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification, ) if errors.Is(err, sql.ErrNoRows) { return nil, nil @@ -633,8 +633,8 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu return r.withQueueTransaction(ctx, func(txRepo *QueueRepository) error { // Prepare batch insert statement query := ` - INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) + INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, skip_arr_notification, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now')) ON CONFLICT(nzb_path) DO UPDATE SET download_id = COALESCE(excluded.download_id, import_queue.download_id), priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END, @@ -649,7 +649,7 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu now := time.Now() for _, item := range items { args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status, - item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize} + item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.SkipArrNotification} if txRepo.dialect.IsPostgres() { err := txRepo.db.QueryRowContext(ctx, query+" RETURNING id", args...).Scan(&item.ID) @@ -678,7 +678,7 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu func (r *QueueRepository) GetQueueItem(ctx context.Context, id int64) (*ImportQueueItem, error) { query := ` SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, - started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path + started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification FROM import_queue WHERE id = ? ` @@ -686,7 +686,7 @@ func (r *QueueRepository) GetQueueItem(ctx context.Context, id int64) (*ImportQu err := r.db.QueryRowContext(ctx, query, id).Scan( &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, - &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, + &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification, ) if err != nil { if err == sql.ErrNoRows { @@ -736,7 +736,7 @@ func (r *QueueRepository) DeleteFailedItemsOlderThan(ctx context.Context, olderT err := r.withQueueTransaction(ctx, func(txRepo *QueueRepository) error { // Select failed items older than the threshold selectQuery := `SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at, - started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path + started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification FROM import_queue WHERE status = 'failed' AND updated_at < ?` rows, err := txRepo.db.QueryContext(ctx, selectQuery, olderThan) @@ -750,7 +750,7 @@ func (r *QueueRepository) DeleteFailedItemsOlderThan(ctx context.Context, olderT if err := rows.Scan( &item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status, &item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt, - &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, + &item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification, ); err != nil { return fmt.Errorf("failed to scan failed queue item: %w", err) } diff --git a/internal/database/testing.go b/internal/database/testing.go index 3486db95f..7ee2726be 100644 --- a/internal/database/testing.go +++ b/internal/database/testing.go @@ -32,6 +32,7 @@ func setupQueueSchema(t *testing.T, db *sql.DB) { category TEXT DEFAULT NULL, file_size BIGINT DEFAULT NULL, target_path TEXT DEFAULT NULL, + skip_arr_notification BOOLEAN NOT NULL DEFAULT FALSE, UNIQUE(nzb_path) ); diff --git a/internal/importer/postprocessor/coordinator.go b/internal/importer/postprocessor/coordinator.go index 62f213a7e..fb01afe30 100644 --- a/internal/importer/postprocessor/coordinator.go +++ b/internal/importer/postprocessor/coordinator.go @@ -134,7 +134,11 @@ func (c *Coordinator) HandleSuccess(ctx context.Context, item *database.ImportQu } // 6. Notify ARR applications - if err := c.notifyARRWith(ctx, arrsService, item, resultingPath); err != nil { + if shouldSkipARRNotification(item) { + c.log.DebugContext(ctx, "ARR notification skipped (requested by caller)", + "queue_id", item.ID, + "path", resultingPath) + } else if err := c.notifyARRWith(ctx, arrsService, item, resultingPath); err != nil { c.log.DebugContext(ctx, "ARR notification not sent", "path", resultingPath, "error", err) @@ -157,3 +161,9 @@ func (c *Coordinator) HandleFailure(ctx context.Context, item *database.ImportQu return errors.ErrFallbackNotConfigured } + +// shouldSkipARRNotification returns true when the caller explicitly requested +// that ARR notifications be suppressed. +func shouldSkipARRNotification(item *database.ImportQueueItem) bool { + return item.SkipArrNotification +} diff --git a/internal/importer/postprocessor/coordinator_skip_arr_test.go b/internal/importer/postprocessor/coordinator_skip_arr_test.go new file mode 100644 index 000000000..2491571bc --- /dev/null +++ b/internal/importer/postprocessor/coordinator_skip_arr_test.go @@ -0,0 +1,53 @@ +package postprocessor + +import ( + "context" + "testing" + + "github.com/javi11/altmount/internal/config" + "github.com/javi11/altmount/internal/database" + "github.com/javi11/altmount/internal/metadata" +) + +func TestSkipARRNotificationFromField(t *testing.T) { + tests := []struct { + name string + flag bool + want bool + }{ + {"false → do not skip", false, false}, + {"true → skip", true, true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + item := &database.ImportQueueItem{SkipArrNotification: tt.flag} + if got := shouldSkipARRNotification(item); got != tt.want { + t.Errorf("shouldSkipARRNotification() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestCoordinator_HandleSuccess_SkipsARRNotification(t *testing.T) { + item := &database.ImportQueueItem{ + ID: 1, + SkipArrNotification: true, + } + + cfg := &config.Config{MountType: config.MountTypeNone} + configGetter := func() *config.Config { return cfg } + metaSvc := metadata.NewMetadataService(t.TempDir()) + + coord := NewCoordinator(Config{ + ConfigGetter: configGetter, + MetadataService: metaSvc, + }) + + result, err := coord.HandleSuccess(context.Background(), item, "/some/path") + if err != nil { + t.Fatalf("HandleSuccess returned unexpected error: %v", err) + } + if result.ARRNotified { + t.Error("expected ARRNotified to be false when SkipArrNotification is set, got true") + } +}