Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion internal/backend/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ func (a *App) initializeProcessor() error {
a.webEventEmitter("job-error", eventData)
}
},
OnJobComplete: func() {
if !a.isWebMode {
runtime.EventsEmit(a.ctx, "queue-updated")
} else if a.webEventEmitter != nil {
a.webEventEmitter("queue-updated", nil)
}
},
})

// Start processor
Expand Down Expand Up @@ -136,7 +143,13 @@ func (a *App) initializePostCheckWorker() {
return
}

a.postCheckWorker = processor.NewPostCheckRetryWorker(a.ctx, a.queue, checkPool, postCheckCfg)
a.postCheckWorker = processor.NewPostCheckRetryWorker(a.ctx, a.queue, checkPool, postCheckCfg, func() {
if !a.isWebMode {
runtime.EventsEmit(a.ctx, "queue-updated")
} else if a.webEventEmitter != nil {
a.webEventEmitter("queue-updated", nil)
}
})
a.postCheckWorker.Start()
slog.Info("Post check retry worker initialized")
}
Expand Down
10 changes: 6 additions & 4 deletions internal/backend/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type QueueItem struct {
ErrorMessage *string `json:"errorMessage"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
CompletedAt *time.Time `json:"completedAt"`
NzbPath *string `json:"nzbPath"`
CompletedAt *time.Time `json:"completedAt"`
NzbPath *string `json:"nzbPath"`
VerificationStatus *string `json:"verificationStatus"`
}

// QueueStats represents queue statistics
Expand Down Expand Up @@ -208,8 +209,9 @@ func (a *App) GetQueueItems(params PaginationParams) (*PaginatedQueueResult, err
ErrorMessage: queueItem.ErrorMessage,
CreatedAt: queueItem.CreatedAt,
UpdatedAt: queueItem.UpdatedAt,
CompletedAt: queueItem.CompletedAt,
NzbPath: queueItem.NzbPath,
CompletedAt: queueItem.CompletedAt,
NzbPath: queueItem.NzbPath,
VerificationStatus: queueItem.VerificationStatus,
}
items = append(items, item)
}
Expand Down
45 changes: 25 additions & 20 deletions internal/processor/postcheck_retry_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ type postCheckQueue interface {
// stored in the database and this worker periodically rechecks them with
// exponential backoff.
type PostCheckRetryWorker struct {
queue postCheckQueue
checkPool pool.NNTPClient
cfg config.PostCheck
ctx context.Context
cancel context.CancelFunc
checkInterval time.Duration
initialDelay time.Duration
maxBackoff time.Duration
maxRetries int
batchSize int
queue postCheckQueue
checkPool pool.NNTPClient
cfg config.PostCheck
ctx context.Context
cancel context.CancelFunc
checkInterval time.Duration
initialDelay time.Duration
maxBackoff time.Duration
maxRetries int
batchSize int
onStatusChanged func() // notified when a completed item's verification status is updated
}

// NewPostCheckRetryWorker creates a new post check retry worker
Expand All @@ -45,6 +46,7 @@ func NewPostCheckRetryWorker(
q postCheckQueue,
checkPool pool.NNTPClient,
cfg config.PostCheck,
onStatusChanged func(),
) *PostCheckRetryWorker {
workerCtx, cancel := context.WithCancel(ctx)

Expand Down Expand Up @@ -74,16 +76,17 @@ func NewPostCheckRetryWorker(
}

return &PostCheckRetryWorker{
queue: q,
checkPool: checkPool,
cfg: cfg,
ctx: workerCtx,
cancel: cancel,
checkInterval: checkInterval,
initialDelay: initialDelay,
maxBackoff: maxBackoff,
maxRetries: maxRetries,
batchSize: batchSize,
queue: q,
checkPool: checkPool,
cfg: cfg,
ctx: workerCtx,
cancel: cancel,
checkInterval: checkInterval,
initialDelay: initialDelay,
maxBackoff: maxBackoff,
maxRetries: maxRetries,
batchSize: batchSize,
onStatusChanged: onStatusChanged,
}
}

Expand Down Expand Up @@ -266,6 +269,8 @@ func (w *PostCheckRetryWorker) updateCompletedItemStatus(ctx context.Context, co
if err := w.queue.UpdateCompletedItemVerificationStatus(ctx, completedItemID, status); err != nil {
slog.ErrorContext(ctx, "Failed to update completed item verification status",
"error", err, "completedItemID", completedItemID)
} else if w.onStatusChanged != nil {
w.onStatusChanged()
}
}

Expand Down
161 changes: 146 additions & 15 deletions internal/processor/postcheck_retry_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@ import (

// fakeQueue is a hand-rolled implementation of postCheckQueue for tests.
type fakeQueue struct {
articles []queue.PendingArticleCheck
verified []int64
failed []int64
retried []int64
getErr error
verifyErr error
failErr error
retryErr error
countTotal int
countPend int
countFailed int
countErr error
statusErr error
articles []queue.PendingArticleCheck
verified []int64
failed []int64
retried []int64
getErr error
verifyErr error
failErr error
retryErr error
countTotal int
countPend int
countFailed int
countErr error
statusErr error
statusSet string // last status passed to UpdateCompletedItemVerificationStatus
statusSetCount int // number of times UpdateCompletedItemVerificationStatus was called
}

func (f *fakeQueue) GetArticlesForCheck(_ context.Context, limit int) ([]queue.PendingArticleCheck, error) {
Expand Down Expand Up @@ -59,7 +61,9 @@ func (f *fakeQueue) GetPendingCheckCountForItem(_ context.Context, _ string) (to
return f.countTotal, f.countPend, f.countFailed, f.countErr
}

func (f *fakeQueue) UpdateCompletedItemVerificationStatus(_ context.Context, _ string, _ string) error {
func (f *fakeQueue) UpdateCompletedItemVerificationStatus(_ context.Context, _ string, status string) error {
f.statusSet = status
f.statusSetCount++
return f.statusErr
}

Expand Down Expand Up @@ -93,7 +97,7 @@ func newWorker(ctx context.Context, q postCheckQueue, pool *mocks.MockNNTPClient
DeferredMaxRetries: maxRetries,
DeferredBatchSize: batchSize,
}
w := NewPostCheckRetryWorker(ctx, q, pool, cfg)
w := NewPostCheckRetryWorker(ctx, q, pool, cfg, nil)
return w
}

Expand Down Expand Up @@ -240,6 +244,133 @@ func TestProcessRetries(t *testing.T) {
}
})

t.Run("onStatusChanged called when all articles verified", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

articles := makeArticles(1, 0)
q := &fakeQueue{articles: articles, countTotal: 1, countPend: 0, countFailed: 0}
mockPool := mocks.NewMockNNTPClient(ctrl)
mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, nil).Times(1)

called := false
enabled := makeEnabled(true)
cfg := config.PostCheck{
Enabled: enabled,
DeferredCheckInterval: config.Duration("1m"),
DeferredCheckDelay: config.Duration("5m"),
DeferredMaxBackoff: config.Duration("1h"),
DeferredMaxRetries: 5,
DeferredBatchSize: 10,
}
w := NewPostCheckRetryWorker(context.Background(), q, mockPool, cfg, func() { called = true })

w.processRetries()

if !called {
t.Error("expected onStatusChanged to be called after all articles verified, but it was not")
}
if q.statusSet != "verified" {
t.Errorf("expected status 'verified', got %q", q.statusSet)
}
})

t.Run("onStatusChanged called when verification fails", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

// retryCount=4, maxRetries=5 → newRetryCount=5 >= 5 → mark failed
articles := makeArticles(1, 4)
q := &fakeQueue{articles: articles, countTotal: 1, countPend: 0, countFailed: 1}
mockPool := mocks.NewMockNNTPClient(ctrl)
mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, errors.New("not found")).Times(1)

called := false
enabled := makeEnabled(true)
cfg := config.PostCheck{
Enabled: enabled,
DeferredCheckInterval: config.Duration("1m"),
DeferredCheckDelay: config.Duration("5m"),
DeferredMaxBackoff: config.Duration("1h"),
DeferredMaxRetries: 5,
DeferredBatchSize: 10,
}
w := NewPostCheckRetryWorker(context.Background(), q, mockPool, cfg, func() { called = true })

w.processRetries()

if !called {
t.Error("expected onStatusChanged to be called after verification_failed, but it was not")
}
if q.statusSet != "verification_failed" {
t.Errorf("expected status 'verification_failed', got %q", q.statusSet)
}
})

t.Run("onStatusChanged not called when articles still pending", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

articles := makeArticles(1, 0) // retryCount=0, maxRetries=5 → schedules retry
q := &fakeQueue{articles: articles, countTotal: 1, countPend: 1} // still pending
mockPool := mocks.NewMockNNTPClient(ctrl)
mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, errors.New("not found")).Times(1)

called := false
w := newWorker(context.Background(), q, mockPool, 10, 5)
w.onStatusChanged = func() { called = true }

w.processRetries()

if called {
t.Error("expected onStatusChanged NOT to be called when articles are still pending, but it was")
}
if q.statusSetCount != 0 {
t.Errorf("expected UpdateCompletedItemVerificationStatus not to be called, got %d calls", q.statusSetCount)
}
})

t.Run("onStatusChanged not called when status update fails", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

articles := makeArticles(1, 0)
q := &fakeQueue{
articles: articles,
countTotal: 1,
countPend: 0,
countFailed: 0,
statusErr: errors.New("db error"),
}
mockPool := mocks.NewMockNNTPClient(ctrl)
mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, nil).Times(1)

called := false
w := newWorker(context.Background(), q, mockPool, 10, 5)
w.onStatusChanged = func() { called = true }

w.processRetries()

if called {
t.Error("expected onStatusChanged NOT to be called when status update fails, but it was")
}
})

t.Run("nil onStatusChanged does not panic", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

articles := makeArticles(1, 0)
q := &fakeQueue{articles: articles, countTotal: 1, countPend: 0}
mockPool := mocks.NewMockNNTPClient(ctrl)
mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, nil).Times(1)

// nil callback — should not panic
w := newWorker(context.Background(), q, mockPool, 10, 5)

w.processRetries() // no panic expected
})

t.Run("bad groups JSON marks failed", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
12 changes: 12 additions & 0 deletions internal/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Processor struct {
canProcessNextItem func() bool
// Callback when job fails permanently
onJobError func(fileName, errorMessage string)
// Callback when any job finishes (success or deferred)
onJobComplete func()
// Shutdown flag to prevent new operations during close
isShuttingDown bool
}
Expand All @@ -79,6 +81,7 @@ type ProcessorOptions struct {
FollowSymlinks bool // Control whether to follow symlinks when collecting folder files
CanProcessNextItem func() bool // Callback to check if processor can start new items
OnJobError func(fileName, errorMessage string) // Callback when job fails permanently
OnJobComplete func() // Callback when any job finishes (success or deferred)
}
type RunningJobDetails struct {
ID string `json:"id"`
Expand Down Expand Up @@ -121,6 +124,7 @@ func New(opts ProcessorOptions) *Processor {
providerCheckCancel: providerCancel,
canProcessNextItem: opts.CanProcessNextItem,
onJobError: opts.OnJobError,
onJobComplete: opts.OnJobComplete,
}

// Start provider availability monitoring if we have a pool manager
Expand Down Expand Up @@ -300,6 +304,10 @@ func (p *Processor) processNextItem(ctx context.Context) error {
slog.ErrorContext(ctx, "Post upload script execution failed", "error", scriptErr, "nzbPath", actualNzbPath)
}

if p.onJobComplete != nil {
p.onJobComplete()
}

return nil
}

Expand Down Expand Up @@ -340,6 +348,10 @@ func (p *Processor) processNextItem(ctx context.Context) error {
slog.ErrorContext(ctx, "Post upload script execution failed", "error", scriptErr, "nzbPath", actualNzbPath)
}

if p.onJobComplete != nil {
p.onJobComplete()
}

return nil
}

Expand Down
Loading