From 7793451d67f9275305a581b748be3150e786adb0 Mon Sep 17 00:00:00 2001 From: javi11 Date: Sun, 5 Apr 2026 19:54:44 +0200 Subject: [PATCH] fix: emit queue-updated events and expose verification status after deferred check When uploads complete with deferred article verification (DeferredCheckError), the UI had no feedback because: - `VerificationStatus` was missing from the backend `QueueItem` struct sent to the frontend, so pending/verified/failed states were invisible. - No `queue-updated` event was emitted on job completion (normal or deferred path), leaving the queue list stale until a user action triggered a refresh. - The `PostCheckRetryWorker` also emitted no event after resolving deferred articles, so `verified`/`verification_failed` transitions were silent. Fix: - Add `VerificationStatus` to `backend.QueueItem` and populate it in conversion. - Add `OnJobComplete` callback to `Processor`; call it on both completion paths in `processNextItem`; wire it to emit `queue-updated` in the backend. - Add `onStatusChanged` callback to `PostCheckRetryWorker`; call it after a successful `UpdateCompletedItemVerificationStatus`; wire it to emit `queue-updated` in the backend. - Add 5 targeted tests for the new callback behaviour in `postcheck_retry_worker_test.go`. --- internal/backend/processor.go | 15 +- internal/backend/queue.go | 10 +- internal/processor/postcheck_retry_worker.go | 45 ++--- .../processor/postcheck_retry_worker_test.go | 161 ++++++++++++++++-- internal/processor/processor.go | 12 ++ 5 files changed, 203 insertions(+), 40 deletions(-) diff --git a/internal/backend/processor.go b/internal/backend/processor.go index df9a4a2..f00cc68 100644 --- a/internal/backend/processor.go +++ b/internal/backend/processor.go @@ -99,6 +99,13 @@ func (a *App) initializeProcessor() error { a.webEventEmitter("job-error", eventData) } }, + OnJobComplete: func() { + if !a.isWebMode { + runtime.EventsEmit(a.ctx, "queue-updated") + } else if a.webEventEmitter != nil { + a.webEventEmitter("queue-updated", nil) + } + }, }) // Start processor @@ -136,7 +143,13 @@ func (a *App) initializePostCheckWorker() { return } - a.postCheckWorker = processor.NewPostCheckRetryWorker(a.ctx, a.queue, checkPool, postCheckCfg) + a.postCheckWorker = processor.NewPostCheckRetryWorker(a.ctx, a.queue, checkPool, postCheckCfg, func() { + if !a.isWebMode { + runtime.EventsEmit(a.ctx, "queue-updated") + } else if a.webEventEmitter != nil { + a.webEventEmitter("queue-updated", nil) + } + }) a.postCheckWorker.Start() slog.Info("Post check retry worker initialized") } diff --git a/internal/backend/queue.go b/internal/backend/queue.go index 9003f9f..e1144e0 100644 --- a/internal/backend/queue.go +++ b/internal/backend/queue.go @@ -25,8 +25,9 @@ type QueueItem struct { ErrorMessage *string `json:"errorMessage"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` - CompletedAt *time.Time `json:"completedAt"` - NzbPath *string `json:"nzbPath"` + CompletedAt *time.Time `json:"completedAt"` + NzbPath *string `json:"nzbPath"` + VerificationStatus *string `json:"verificationStatus"` } // QueueStats represents queue statistics @@ -208,8 +209,9 @@ func (a *App) GetQueueItems(params PaginationParams) (*PaginatedQueueResult, err ErrorMessage: queueItem.ErrorMessage, CreatedAt: queueItem.CreatedAt, UpdatedAt: queueItem.UpdatedAt, - CompletedAt: queueItem.CompletedAt, - NzbPath: queueItem.NzbPath, + CompletedAt: queueItem.CompletedAt, + NzbPath: queueItem.NzbPath, + VerificationStatus: queueItem.VerificationStatus, } items = append(items, item) } diff --git a/internal/processor/postcheck_retry_worker.go b/internal/processor/postcheck_retry_worker.go index fefb929..dec96ec 100644 --- a/internal/processor/postcheck_retry_worker.go +++ b/internal/processor/postcheck_retry_worker.go @@ -27,16 +27,17 @@ type postCheckQueue interface { // stored in the database and this worker periodically rechecks them with // exponential backoff. type PostCheckRetryWorker struct { - queue postCheckQueue - checkPool pool.NNTPClient - cfg config.PostCheck - ctx context.Context - cancel context.CancelFunc - checkInterval time.Duration - initialDelay time.Duration - maxBackoff time.Duration - maxRetries int - batchSize int + queue postCheckQueue + checkPool pool.NNTPClient + cfg config.PostCheck + ctx context.Context + cancel context.CancelFunc + checkInterval time.Duration + initialDelay time.Duration + maxBackoff time.Duration + maxRetries int + batchSize int + onStatusChanged func() // notified when a completed item's verification status is updated } // NewPostCheckRetryWorker creates a new post check retry worker @@ -45,6 +46,7 @@ func NewPostCheckRetryWorker( q postCheckQueue, checkPool pool.NNTPClient, cfg config.PostCheck, + onStatusChanged func(), ) *PostCheckRetryWorker { workerCtx, cancel := context.WithCancel(ctx) @@ -74,16 +76,17 @@ func NewPostCheckRetryWorker( } return &PostCheckRetryWorker{ - queue: q, - checkPool: checkPool, - cfg: cfg, - ctx: workerCtx, - cancel: cancel, - checkInterval: checkInterval, - initialDelay: initialDelay, - maxBackoff: maxBackoff, - maxRetries: maxRetries, - batchSize: batchSize, + queue: q, + checkPool: checkPool, + cfg: cfg, + ctx: workerCtx, + cancel: cancel, + checkInterval: checkInterval, + initialDelay: initialDelay, + maxBackoff: maxBackoff, + maxRetries: maxRetries, + batchSize: batchSize, + onStatusChanged: onStatusChanged, } } @@ -266,6 +269,8 @@ func (w *PostCheckRetryWorker) updateCompletedItemStatus(ctx context.Context, co if err := w.queue.UpdateCompletedItemVerificationStatus(ctx, completedItemID, status); err != nil { slog.ErrorContext(ctx, "Failed to update completed item verification status", "error", err, "completedItemID", completedItemID) + } else if w.onStatusChanged != nil { + w.onStatusChanged() } } diff --git a/internal/processor/postcheck_retry_worker_test.go b/internal/processor/postcheck_retry_worker_test.go index 3089403..fef5b56 100644 --- a/internal/processor/postcheck_retry_worker_test.go +++ b/internal/processor/postcheck_retry_worker_test.go @@ -15,19 +15,21 @@ import ( // fakeQueue is a hand-rolled implementation of postCheckQueue for tests. type fakeQueue struct { - articles []queue.PendingArticleCheck - verified []int64 - failed []int64 - retried []int64 - getErr error - verifyErr error - failErr error - retryErr error - countTotal int - countPend int - countFailed int - countErr error - statusErr error + articles []queue.PendingArticleCheck + verified []int64 + failed []int64 + retried []int64 + getErr error + verifyErr error + failErr error + retryErr error + countTotal int + countPend int + countFailed int + countErr error + statusErr error + statusSet string // last status passed to UpdateCompletedItemVerificationStatus + statusSetCount int // number of times UpdateCompletedItemVerificationStatus was called } func (f *fakeQueue) GetArticlesForCheck(_ context.Context, limit int) ([]queue.PendingArticleCheck, error) { @@ -59,7 +61,9 @@ func (f *fakeQueue) GetPendingCheckCountForItem(_ context.Context, _ string) (to return f.countTotal, f.countPend, f.countFailed, f.countErr } -func (f *fakeQueue) UpdateCompletedItemVerificationStatus(_ context.Context, _ string, _ string) error { +func (f *fakeQueue) UpdateCompletedItemVerificationStatus(_ context.Context, _ string, status string) error { + f.statusSet = status + f.statusSetCount++ return f.statusErr } @@ -93,7 +97,7 @@ func newWorker(ctx context.Context, q postCheckQueue, pool *mocks.MockNNTPClient DeferredMaxRetries: maxRetries, DeferredBatchSize: batchSize, } - w := NewPostCheckRetryWorker(ctx, q, pool, cfg) + w := NewPostCheckRetryWorker(ctx, q, pool, cfg, nil) return w } @@ -240,6 +244,133 @@ func TestProcessRetries(t *testing.T) { } }) + t.Run("onStatusChanged called when all articles verified", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + articles := makeArticles(1, 0) + q := &fakeQueue{articles: articles, countTotal: 1, countPend: 0, countFailed: 0} + mockPool := mocks.NewMockNNTPClient(ctrl) + mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, nil).Times(1) + + called := false + enabled := makeEnabled(true) + cfg := config.PostCheck{ + Enabled: enabled, + DeferredCheckInterval: config.Duration("1m"), + DeferredCheckDelay: config.Duration("5m"), + DeferredMaxBackoff: config.Duration("1h"), + DeferredMaxRetries: 5, + DeferredBatchSize: 10, + } + w := NewPostCheckRetryWorker(context.Background(), q, mockPool, cfg, func() { called = true }) + + w.processRetries() + + if !called { + t.Error("expected onStatusChanged to be called after all articles verified, but it was not") + } + if q.statusSet != "verified" { + t.Errorf("expected status 'verified', got %q", q.statusSet) + } + }) + + t.Run("onStatusChanged called when verification fails", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // retryCount=4, maxRetries=5 → newRetryCount=5 >= 5 → mark failed + articles := makeArticles(1, 4) + q := &fakeQueue{articles: articles, countTotal: 1, countPend: 0, countFailed: 1} + mockPool := mocks.NewMockNNTPClient(ctrl) + mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, errors.New("not found")).Times(1) + + called := false + enabled := makeEnabled(true) + cfg := config.PostCheck{ + Enabled: enabled, + DeferredCheckInterval: config.Duration("1m"), + DeferredCheckDelay: config.Duration("5m"), + DeferredMaxBackoff: config.Duration("1h"), + DeferredMaxRetries: 5, + DeferredBatchSize: 10, + } + w := NewPostCheckRetryWorker(context.Background(), q, mockPool, cfg, func() { called = true }) + + w.processRetries() + + if !called { + t.Error("expected onStatusChanged to be called after verification_failed, but it was not") + } + if q.statusSet != "verification_failed" { + t.Errorf("expected status 'verification_failed', got %q", q.statusSet) + } + }) + + t.Run("onStatusChanged not called when articles still pending", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + articles := makeArticles(1, 0) // retryCount=0, maxRetries=5 → schedules retry + q := &fakeQueue{articles: articles, countTotal: 1, countPend: 1} // still pending + mockPool := mocks.NewMockNNTPClient(ctrl) + mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, errors.New("not found")).Times(1) + + called := false + w := newWorker(context.Background(), q, mockPool, 10, 5) + w.onStatusChanged = func() { called = true } + + w.processRetries() + + if called { + t.Error("expected onStatusChanged NOT to be called when articles are still pending, but it was") + } + if q.statusSetCount != 0 { + t.Errorf("expected UpdateCompletedItemVerificationStatus not to be called, got %d calls", q.statusSetCount) + } + }) + + t.Run("onStatusChanged not called when status update fails", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + articles := makeArticles(1, 0) + q := &fakeQueue{ + articles: articles, + countTotal: 1, + countPend: 0, + countFailed: 0, + statusErr: errors.New("db error"), + } + mockPool := mocks.NewMockNNTPClient(ctrl) + mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, nil).Times(1) + + called := false + w := newWorker(context.Background(), q, mockPool, 10, 5) + w.onStatusChanged = func() { called = true } + + w.processRetries() + + if called { + t.Error("expected onStatusChanged NOT to be called when status update fails, but it was") + } + }) + + t.Run("nil onStatusChanged does not panic", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + articles := makeArticles(1, 0) + q := &fakeQueue{articles: articles, countTotal: 1, countPend: 0} + mockPool := mocks.NewMockNNTPClient(ctrl) + mockPool.EXPECT().Stat(gomock.Any(), articles[0].MessageID).Return(nil, nil).Times(1) + + // nil callback — should not panic + w := newWorker(context.Background(), q, mockPool, 10, 5) + + w.processRetries() // no panic expected + }) + t.Run("bad groups JSON marks failed", func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 8b049af..4080547 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -62,6 +62,8 @@ type Processor struct { canProcessNextItem func() bool // Callback when job fails permanently onJobError func(fileName, errorMessage string) + // Callback when any job finishes (success or deferred) + onJobComplete func() // Shutdown flag to prevent new operations during close isShuttingDown bool } @@ -79,6 +81,7 @@ type ProcessorOptions struct { FollowSymlinks bool // Control whether to follow symlinks when collecting folder files CanProcessNextItem func() bool // Callback to check if processor can start new items OnJobError func(fileName, errorMessage string) // Callback when job fails permanently + OnJobComplete func() // Callback when any job finishes (success or deferred) } type RunningJobDetails struct { ID string `json:"id"` @@ -121,6 +124,7 @@ func New(opts ProcessorOptions) *Processor { providerCheckCancel: providerCancel, canProcessNextItem: opts.CanProcessNextItem, onJobError: opts.OnJobError, + onJobComplete: opts.OnJobComplete, } // Start provider availability monitoring if we have a pool manager @@ -300,6 +304,10 @@ func (p *Processor) processNextItem(ctx context.Context) error { slog.ErrorContext(ctx, "Post upload script execution failed", "error", scriptErr, "nzbPath", actualNzbPath) } + if p.onJobComplete != nil { + p.onJobComplete() + } + return nil } @@ -340,6 +348,10 @@ func (p *Processor) processNextItem(ctx context.Context) error { slog.ErrorContext(ctx, "Post upload script execution failed", "error", scriptErr, "nzbPath", actualNzbPath) } + if p.onJobComplete != nil { + p.onJobComplete() + } + return nil }