Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions internal/api/import_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,15 @@ func (s *Server) handleManualImportFile(c *fiber.Ctx) error {

// Add the file to the processing queue
item := &database.ImportQueueItem{
NzbPath: req.FilePath,
Priority: database.QueuePriorityNormal,
Status: database.QueueStatusPending,
RetryCount: 0,
MaxRetries: 3,
CreatedAt: time.Now(),
RelativePath: req.RelativePath,
TargetPath: targetPath,
NzbPath: req.FilePath,
Priority: database.QueuePriorityNormal,
Status: database.QueueStatusPending,
RetryCount: 0,
MaxRetries: 3,
CreatedAt: time.Now(),
RelativePath: req.RelativePath,
TargetPath: targetPath,
SkipArrNotification: req.SkipArrNotification,
}

slog.DebugContext(c.Context(), "Adding file to queue", "file", req.FilePath, "relative_path", req.RelativePath, "target_path", targetPath)
Expand Down
17 changes: 17 additions & 0 deletions internal/api/import_handlers_skip_arr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package api

import "testing"

func TestManualImportRequest_SkipArrNotification_True(t *testing.T) {
req := ManualImportRequest{SkipArrNotification: true}
if !req.SkipArrNotification {
t.Error("expected SkipArrNotification to be true")
}
}

func TestManualImportRequest_SkipArrNotification_FalseByDefault(t *testing.T) {
req := ManualImportRequest{}
if req.SkipArrNotification {
t.Error("expected SkipArrNotification to be false by default")
}
}
8 changes: 7 additions & 1 deletion internal/api/sabnzbd_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func (s *Server) handleSABnzbdQueue(c *fiber.Ctx) error {
var totalMbLeft float64

for i, item := range items {
if item.Status == database.QueueStatusFallback {
if item.Status == database.QueueStatusFallback || item.SkipArrNotification {
continue
}

Expand Down Expand Up @@ -762,6 +762,9 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error {
finalItems := make([]*database.ImportQueueItem, 0)

for _, item := range completedQueueItems {
if item.SkipArrNotification {
continue
}
name := filepath.Base(item.NzbPath)
// Filter by nzo_ids if requested (check both integer ID and DownloadID)
if len(nzoIDs) > 0 {
Expand Down Expand Up @@ -821,6 +824,9 @@ func (s *Server) handleSABnzbdHistory(c *fiber.Ctx) error {

// Combine failed items for noofslots calculation
for _, item := range failed {
if item.SkipArrNotification {
continue
}
name := filepath.Base(item.NzbPath)
// Filter by nzo_ids if requested
if len(nzoIDs) > 0 {
Expand Down
5 changes: 3 additions & 2 deletions internal/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,8 +899,9 @@ type ScanStatusResponse struct {

// ManualImportRequest represents a request to manually import a file by path
type ManualImportRequest struct {
FilePath string `json:"file_path"`
RelativePath *string `json:"relative_path,omitempty"`
FilePath string `json:"file_path"`
RelativePath *string `json:"relative_path,omitempty"`
SkipArrNotification bool `json:"skip_arr_notification,omitempty"`
}

// ManualImportResponse represents the response from manually importing a file
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE import_queue ADD COLUMN skip_arr_notification BOOLEAN NOT NULL DEFAULT FALSE;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE import_queue DROP COLUMN IF EXISTS skip_arr_notification;
-- +goose StatementEnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- +goose Up
-- +goose StatementBegin
ALTER TABLE import_queue ADD COLUMN skip_arr_notification BOOLEAN NOT NULL DEFAULT FALSE;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
-- SQLite does not support DROP COLUMN in older versions; intentional no-op
-- +goose StatementEnd
6 changes: 4 additions & 2 deletions internal/database/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ type ImportQueueItem struct {
ErrorMessage *string `db:"error_message"`
BatchID *string `db:"batch_id"`
Metadata *string `db:"metadata"` // JSON metadata
FileSize *int64 `db:"file_size"` // Total size in bytes calculated from segments
TargetPath *string `db:"target_path"` // Optional forced symlink destination path
FileSize *int64 `db:"file_size"` // Total size in bytes calculated from segments
TargetPath *string `db:"target_path"` // Optional forced symlink destination path
SkipArrNotification bool `db:"skip_arr_notification"`
}

// BulkOperationResult represents the result of a bulk queue operation
Expand Down Expand Up @@ -167,3 +168,4 @@ type ImportHistory struct {
Metadata *string `db:"metadata"`
CompletedAt time.Time `db:"completed_at"`
}

28 changes: 14 additions & 14 deletions internal/database/queue_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (r *QueueRepository) RestartQueueItemsBulk(ctx context.Context, ids []int64
// AddToQueue adds a new NZB file to the import queue
func (r *QueueRepository) AddToQueue(ctx context.Context, item *ImportQueueItem) error {
query := `
INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, target_path, skip_arr_notification, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
ON CONFLICT(nzb_path) DO UPDATE SET
download_id = COALESCE(excluded.download_id, import_queue.download_id),
priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END,
Expand All @@ -129,7 +129,7 @@ func (r *QueueRepository) AddToQueue(ctx context.Context, item *ImportQueueItem)
`

args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status,
item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.TargetPath}
item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.TargetPath, item.SkipArrNotification}

if r.dialect.IsPostgres() {
err := r.db.QueryRowContext(ctx, query+" RETURNING id", args...).Scan(&item.ID)
Expand Down Expand Up @@ -232,7 +232,7 @@ func (r *QueueRepository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueI
// Get the complete claimed item data
getQuery := `
SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at,
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, target_path
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, target_path, skip_arr_notification
FROM import_queue
WHERE id = ?
`
Expand All @@ -241,7 +241,7 @@ func (r *QueueRepository) ClaimNextQueueItem(ctx context.Context) (*ImportQueueI
err = txRepo.db.QueryRowContext(ctx, getQuery, itemID).Scan(
&item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status,
&item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.TargetPath,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.TargetPath, &item.SkipArrNotification,
)
if err != nil {
return fmt.Errorf("failed to get claimed item: %w", err)
Expand Down Expand Up @@ -546,15 +546,15 @@ func (r *QueueRepository) UpdateQueueItemNzbPath(ctx context.Context, id int64,
func (r *QueueRepository) GetQueueItemByNzbPath(ctx context.Context, nzbPath string) (*ImportQueueItem, error) {
query := `
SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at,
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification
FROM import_queue WHERE nzb_path = ? LIMIT 1
`

var item ImportQueueItem
err := r.db.QueryRowContext(ctx, query, nzbPath).Scan(
&item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status,
&item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification,
)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
Expand Down Expand Up @@ -633,8 +633,8 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu
return r.withQueueTransaction(ctx, func(txRepo *QueueRepository) error {
// Prepare batch insert statement
query := `
INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
INSERT INTO import_queue (download_id, nzb_path, relative_path, category, priority, status, retry_count, max_retries, batch_id, metadata, file_size, skip_arr_notification, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now'), datetime('now'))
ON CONFLICT(nzb_path) DO UPDATE SET
download_id = COALESCE(excluded.download_id, import_queue.download_id),
priority = CASE WHEN excluded.priority < priority THEN excluded.priority ELSE priority END,
Expand All @@ -649,7 +649,7 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu
now := time.Now()
for _, item := range items {
args := []any{item.DownloadID, item.NzbPath, item.RelativePath, item.Category, item.Priority, item.Status,
item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize}
item.RetryCount, item.MaxRetries, item.BatchID, item.Metadata, item.FileSize, item.SkipArrNotification}

if txRepo.dialect.IsPostgres() {
err := txRepo.db.QueryRowContext(ctx, query+" RETURNING id", args...).Scan(&item.ID)
Expand Down Expand Up @@ -678,15 +678,15 @@ func (r *QueueRepository) AddBatchToQueue(ctx context.Context, items []*ImportQu
func (r *QueueRepository) GetQueueItem(ctx context.Context, id int64) (*ImportQueueItem, error) {
query := `
SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at,
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification
FROM import_queue WHERE id = ?
`

var item ImportQueueItem
err := r.db.QueryRowContext(ctx, query, id).Scan(
&item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status,
&item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification,
)
if err != nil {
if err == sql.ErrNoRows {
Expand Down Expand Up @@ -736,7 +736,7 @@ func (r *QueueRepository) DeleteFailedItemsOlderThan(ctx context.Context, olderT
err := r.withQueueTransaction(ctx, func(txRepo *QueueRepository) error {
// Select failed items older than the threshold
selectQuery := `SELECT id, download_id, nzb_path, relative_path, category, priority, status, created_at, updated_at,
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path
started_at, completed_at, retry_count, max_retries, error_message, batch_id, metadata, file_size, storage_path, target_path, skip_arr_notification
FROM import_queue WHERE status = 'failed' AND updated_at < ?`

rows, err := txRepo.db.QueryContext(ctx, selectQuery, olderThan)
Expand All @@ -750,7 +750,7 @@ func (r *QueueRepository) DeleteFailedItemsOlderThan(ctx context.Context, olderT
if err := rows.Scan(
&item.ID, &item.DownloadID, &item.NzbPath, &item.RelativePath, &item.Category, &item.Priority, &item.Status,
&item.CreatedAt, &item.UpdatedAt, &item.StartedAt, &item.CompletedAt,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath,
&item.RetryCount, &item.MaxRetries, &item.ErrorMessage, &item.BatchID, &item.Metadata, &item.FileSize, &item.StoragePath, &item.TargetPath, &item.SkipArrNotification,
); err != nil {
return fmt.Errorf("failed to scan failed queue item: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions internal/database/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func setupQueueSchema(t *testing.T, db *sql.DB) {
category TEXT DEFAULT NULL,
file_size BIGINT DEFAULT NULL,
target_path TEXT DEFAULT NULL,
skip_arr_notification BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE(nzb_path)
);

Expand Down
12 changes: 11 additions & 1 deletion internal/importer/postprocessor/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func (c *Coordinator) HandleSuccess(ctx context.Context, item *database.ImportQu
}

// 6. Notify ARR applications
if err := c.notifyARRWith(ctx, arrsService, item, resultingPath); err != nil {
if shouldSkipARRNotification(item) {
c.log.DebugContext(ctx, "ARR notification skipped (requested by caller)",
"queue_id", item.ID,
"path", resultingPath)
} else if err := c.notifyARRWith(ctx, arrsService, item, resultingPath); err != nil {
c.log.DebugContext(ctx, "ARR notification not sent",
"path", resultingPath,
"error", err)
Expand All @@ -157,3 +161,9 @@ func (c *Coordinator) HandleFailure(ctx context.Context, item *database.ImportQu

return errors.ErrFallbackNotConfigured
}

// shouldSkipARRNotification returns true when the caller explicitly requested
// that ARR notifications be suppressed.
func shouldSkipARRNotification(item *database.ImportQueueItem) bool {
return item.SkipArrNotification
}
53 changes: 53 additions & 0 deletions internal/importer/postprocessor/coordinator_skip_arr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package postprocessor

import (
"context"
"testing"

"github.com/javi11/altmount/internal/config"
"github.com/javi11/altmount/internal/database"
"github.com/javi11/altmount/internal/metadata"
)

func TestSkipARRNotificationFromField(t *testing.T) {
tests := []struct {
name string
flag bool
want bool
}{
{"false → do not skip", false, false},
{"true → skip", true, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
item := &database.ImportQueueItem{SkipArrNotification: tt.flag}
if got := shouldSkipARRNotification(item); got != tt.want {
t.Errorf("shouldSkipARRNotification() = %v, want %v", got, tt.want)
}
})
}
}

func TestCoordinator_HandleSuccess_SkipsARRNotification(t *testing.T) {
item := &database.ImportQueueItem{
ID: 1,
SkipArrNotification: true,
}

cfg := &config.Config{MountType: config.MountTypeNone}
configGetter := func() *config.Config { return cfg }
metaSvc := metadata.NewMetadataService(t.TempDir())

coord := NewCoordinator(Config{
ConfigGetter: configGetter,
MetadataService: metaSvc,
})

result, err := coord.HandleSuccess(context.Background(), item, "/some/path")
if err != nil {
t.Fatalf("HandleSuccess returned unexpected error: %v", err)
}
if result.ARRNotified {
t.Error("expected ARRNotified to be false when SkipArrNotification is set, got true")
}
}
Loading