diff --git a/internal/poster/poster.go b/internal/poster/poster.go index 1261923..2c44788 100644 --- a/internal/poster/poster.go +++ b/internal/poster/poster.go @@ -79,7 +79,7 @@ type Post struct { mu sync.Mutex filesize int64 wg *sync.WaitGroup - failed *int + failed *atomic.Int64 progress progress.Progress } @@ -214,20 +214,24 @@ func (p *poster) PostWithRelativePaths( } wg := sync.WaitGroup{} - var failedPosts int + var failedPosts atomic.Int64 // Create a context that can be canceled ctx, cancel := context.WithCancel(ctx) defer cancel() - // Create error channel to collect errors - errChan := make(chan error, 1) + // Create error channel to collect errors. Buffered enough that the deferred + // writers (postLoop, checkLoop, deferred-check sender) cannot block each + // other if more than one error fires before the main goroutine drains. + errChan := make(chan error, 4) // Create channels for post and check queues postQueue := make(chan *Post, 100) checkQueue := make(chan *Post, 100) - // Track posts in flight (initial + retries) to know when all processing is complete + // Track posts in flight (initial + retries) so we close postQueue only once + // every initial post AND every retry is fully accounted for. Closing earlier + // races with checkLoop's retry sends and panics on closed-channel send. var postsInFlight sync.WaitGroup // Start a goroutine to process posts @@ -261,9 +265,13 @@ func (p *poster) PostWithRelativePaths( } } - // Close postQueue after all initial posts have been added - // The checkLoop can still add retries back to the queue if needed - close(postQueue) + // Close postQueue only when no posts are in-flight (initial + any retries + // queued by checkLoop). This avoids the closed-channel panic on retry sends + // and lets postLoop/checkLoop drain naturally. + go func() { + postsInFlight.Wait() + close(postQueue) + }() // Wait for all posts to complete or an error to occur done := make(chan struct{}) @@ -309,8 +317,8 @@ func (p *poster) PostWithRelativePaths( default: } - if failedPosts > 0 { - return fmt.Errorf("failed to post %d files", failedPosts) + if n := failedPosts.Load(); n > 0 { + return fmt.Errorf("failed to post %d files", n) } // Return deferred error if present (non-fatal - caller should handle) @@ -349,6 +357,11 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue post.Status = PostStatusPosting post.mu.Unlock() + // Per-post context so the read-ahead goroutine always terminates + // when this post's block exits, even if the parent ctx is still + // alive (e.g. on the deferred-check non-fatal-error path). + postCtx, postCancel := context.WithCancel(ctx) + // Create read-ahead channel (buffer 50 articles ahead to overlap I/O with network) readAheadChan := make(chan articleWithBody, 50) @@ -357,7 +370,7 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue defer close(readAheadChan) for _, art := range post.Articles { select { - case <-ctx.Done(): + case <-postCtx.Done(): return default: // Get buffer from pool, resize if needed @@ -377,7 +390,7 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue select { case readAheadChan <- articleWithBody{article: art, body: body, poolBuf: poolBuf}: - case <-ctx.Done(): + case <-postCtx.Done(): // Return buffer to pool if context cancelled bodyBufferPool.Put(poolBuf[:cap(poolBuf)]) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed return @@ -437,6 +450,10 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue // Wait for all workers to complete and collect errors errs := pool.Wait() + // Read-ahead goroutine has finished (readAheadChan was drained above) + // but cancel the per-post ctx so any straggler observes Done. + postCancel() + // Batch add completed articles to NZB generator (reduces lock contention) for _, art := range completedArticles { nzbGen.AddArticle(art) @@ -458,6 +475,15 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue // Mark this post as done in the queue tracking postsInFlight.Done() + // Close the underlying file so the descriptor isn't leaked on + // failure. Long-running daemons that hit intermittent NNTP + // errors otherwise exhaust the process fd ulimit and stall. + if post.file != nil { + if cerr := post.file.Close(); cerr != nil { + slog.WarnContext(ctx, "Error closing file handle on post failure", "error", cerr, "file", post.FilePath) + } + } + if !errors.Is(errs, context.Canceled) { errChan <- fmt.Errorf("failed to post file %s after %d retries: %v", post.FilePath, p.cfg.MaxRetries, errs) } @@ -646,34 +672,25 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue // Track this retry in the queue before sending postsInFlight.Add(1) - // Try to send retry to postQueue - // Use select to handle closed channel gracefully + // Send retry. postQueue is kept open by the postsInFlight + // gate (the Add above runs before this send), so a closed- + // channel panic here is no longer reachable. Only ctx + // cancellation can interrupt the send. + // + // Bookkeeping: addPost added +1 for the original post; the + // Add above added +1 for the retry. Once the retry is + // queued, the original post is no longer "in flight" — the + // retry represents it from here on — so Done the original. select { case postQueue <- failedPost: - // Retry sent successfully, continue to next post + postsInFlight.Done() continue case <-ctx.Done(): - // Context canceled, stop processing - // Decrement since we added above but didn't actually send + // Decrement both: the retry that won't be sent and the + // original post we are abandoning. postsInFlight.Done() - slog.WarnContext(ctx, "Context canceled while trying to send retry", "file", post.FilePath) - return - default: - // Channel is closed or full, cannot retry - // Decrement since we added above but didn't actually send postsInFlight.Done() - // Treat this as a failure - slog.WarnContext(ctx, "Cannot send retry - postQueue unavailable", "file", post.FilePath) - post.mu.Lock() - post.Status = PostStatusFailed - post.Error = fmt.Errorf("failed to queue retry - postQueue closed") - post.mu.Unlock() - - if post.failed != nil { - *post.failed++ - } - - errChan <- fmt.Errorf("failed to queue retry for file %s", post.FilePath) + slog.WarnContext(ctx, "Context canceled while trying to send retry", "file", post.FilePath) return } } @@ -725,7 +742,13 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue postsInFlight.Done() if post.failed != nil { - *post.failed++ + post.failed.Add(1) + } + + if post.file != nil { + if cerr := post.file.Close(); cerr != nil { + slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", cerr, "file", post.FilePath) + } } errChan <- fmt.Errorf("failed to verify file %s after %d retries", post.FilePath, p.checkCfg.MaxRePost) @@ -741,7 +764,13 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue postsInFlight.Done() if post.failed != nil { - *post.failed++ + post.failed.Add(1) + } + + if post.file != nil { + if cerr := post.file.Close(); cerr != nil { + slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", cerr, "file", post.FilePath) + } } errChan <- fmt.Errorf("unexpected error verifying file %s: %v", post.FilePath, errors) @@ -770,14 +799,19 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue } // After processing all posts, if there are deferred articles, send a DeferredCheckError - // This is a non-fatal error that signals the caller to store these for later verification + // This is a non-fatal error that signals the caller to store these for later verification. + // Guard the send with ctx so a leaked checkLoop cannot block forever if the main + // goroutine has already returned. if len(allDeferredArticles) > 0 { slog.InfoContext(ctx, "Sending deferred check error", "deferred_articles", len(allDeferredArticles), "total_articles", totalArticlesProcessed) - errChan <- &DeferredCheckError{ + select { + case errChan <- &DeferredCheckError{ FailedArticles: allDeferredArticles, TotalArticles: totalArticlesProcessed, + }: + case <-ctx.Done(): } } } @@ -785,7 +819,7 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue // addPost adds a file to the posting queue // displayName is the name to use in the subject (e.g., "Folder/subfolder/file.mp4") // If displayName is empty, the filename is used -func (p *poster) addPost(ctx context.Context, filePath string, displayName string, fileNumber int, totalFiles int, wg *sync.WaitGroup, failedPosts *int, postQueue chan<- *Post, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) error { +func (p *poster) addPost(ctx context.Context, filePath string, displayName string, fileNumber int, totalFiles int, wg *sync.WaitGroup, failedPosts *atomic.Int64, postQueue chan<- *Post, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) error { file, err := os.Open(filePath) if err != nil { return fmt.Errorf("error opening file: %w", err) diff --git a/internal/poster/poster_test.go b/internal/poster/poster_test.go index 8b0ceec..e184388 100644 --- a/internal/poster/poster_test.go +++ b/internal/poster/poster_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "syscall" "testing" "time" @@ -825,7 +826,7 @@ func TestAddPost(t *testing.T) { var wg sync.WaitGroup var postsInFlight sync.WaitGroup - failedPosts := 0 + var failedPosts atomic.Int64 postQueue := make(chan *Post, 10) nzbGen := mocks.NewMockNZBGenerator(ctrl) @@ -863,7 +864,7 @@ func TestAddPost(t *testing.T) { var wg sync.WaitGroup var postsInFlight sync.WaitGroup - failedPosts := 0 + var failedPosts atomic.Int64 postQueue := make(chan *Post, 10) nzbGen := mocks.NewMockNZBGenerator(ctrl)