Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 73 additions & 39 deletions internal/poster/poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Post struct {
mu sync.Mutex
filesize int64
wg *sync.WaitGroup
failed *int
failed *atomic.Int64
progress progress.Progress
}

Expand Down Expand Up @@ -214,20 +214,24 @@ func (p *poster) PostWithRelativePaths(
}

wg := sync.WaitGroup{}
var failedPosts int
var failedPosts atomic.Int64

// Create a context that can be canceled
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Create error channel to collect errors
errChan := make(chan error, 1)
// Create error channel to collect errors. Buffered enough that the deferred
// writers (postLoop, checkLoop, deferred-check sender) cannot block each
// other if more than one error fires before the main goroutine drains.
errChan := make(chan error, 4)

// Create channels for post and check queues
postQueue := make(chan *Post, 100)
checkQueue := make(chan *Post, 100)

// Track posts in flight (initial + retries) to know when all processing is complete
// Track posts in flight (initial + retries) so we close postQueue only once
// every initial post AND every retry is fully accounted for. Closing earlier
// races with checkLoop's retry sends and panics on closed-channel send.
var postsInFlight sync.WaitGroup

// Start a goroutine to process posts
Expand Down Expand Up @@ -261,9 +265,13 @@ func (p *poster) PostWithRelativePaths(
}
}

// Close postQueue after all initial posts have been added
// The checkLoop can still add retries back to the queue if needed
close(postQueue)
// Close postQueue only when no posts are in-flight (initial + any retries
// queued by checkLoop). This avoids the closed-channel panic on retry sends
// and lets postLoop/checkLoop drain naturally.
go func() {
postsInFlight.Wait()
close(postQueue)
}()

// Wait for all posts to complete or an error to occur
done := make(chan struct{})
Expand Down Expand Up @@ -309,8 +317,8 @@ func (p *poster) PostWithRelativePaths(
default:
}

if failedPosts > 0 {
return fmt.Errorf("failed to post %d files", failedPosts)
if n := failedPosts.Load(); n > 0 {
return fmt.Errorf("failed to post %d files", n)
}

// Return deferred error if present (non-fatal - caller should handle)
Expand Down Expand Up @@ -349,6 +357,11 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue
post.Status = PostStatusPosting
post.mu.Unlock()

// Per-post context so the read-ahead goroutine always terminates
// when this post's block exits, even if the parent ctx is still
// alive (e.g. on the deferred-check non-fatal-error path).
postCtx, postCancel := context.WithCancel(ctx)

// Create read-ahead channel (buffer 50 articles ahead to overlap I/O with network)
readAheadChan := make(chan articleWithBody, 50)

Expand All @@ -357,7 +370,7 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue
defer close(readAheadChan)
for _, art := range post.Articles {
select {
case <-ctx.Done():
case <-postCtx.Done():
return
default:
// Get buffer from pool, resize if needed
Expand All @@ -377,7 +390,7 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue

select {
case readAheadChan <- articleWithBody{article: art, body: body, poolBuf: poolBuf}:
case <-ctx.Done():
case <-postCtx.Done():
// Return buffer to pool if context cancelled
bodyBufferPool.Put(poolBuf[:cap(poolBuf)]) //nolint:staticcheck // SA6002: slices have pointer semantics, no wrapper needed
return
Expand Down Expand Up @@ -437,6 +450,10 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue
// Wait for all workers to complete and collect errors
errs := pool.Wait()

// Read-ahead goroutine has finished (readAheadChan was drained above)
// but cancel the per-post ctx so any straggler observes Done.
postCancel()

// Batch add completed articles to NZB generator (reduces lock contention)
for _, art := range completedArticles {
nzbGen.AddArticle(art)
Expand All @@ -458,6 +475,15 @@ func (p *poster) postLoop(ctx context.Context, postQueue chan *Post, checkQueue
// Mark this post as done in the queue tracking
postsInFlight.Done()

// Close the underlying file so the descriptor isn't leaked on
// failure. Long-running daemons that hit intermittent NNTP
// errors otherwise exhaust the process fd ulimit and stall.
if post.file != nil {
if cerr := post.file.Close(); cerr != nil {
slog.WarnContext(ctx, "Error closing file handle on post failure", "error", cerr, "file", post.FilePath)
}
}

if !errors.Is(errs, context.Canceled) {
errChan <- fmt.Errorf("failed to post file %s after %d retries: %v", post.FilePath, p.cfg.MaxRetries, errs)
}
Expand Down Expand Up @@ -646,34 +672,25 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue
// Track this retry in the queue before sending
postsInFlight.Add(1)

// Try to send retry to postQueue
// Use select to handle closed channel gracefully
// Send retry. postQueue is kept open by the postsInFlight
// gate (the Add above runs before this send), so a closed-
// channel panic here is no longer reachable. Only ctx
// cancellation can interrupt the send.
//
// Bookkeeping: addPost added +1 for the original post; the
// Add above added +1 for the retry. Once the retry is
// queued, the original post is no longer "in flight" — the
// retry represents it from here on — so Done the original.
select {
case postQueue <- failedPost:
// Retry sent successfully, continue to next post
postsInFlight.Done()
continue
case <-ctx.Done():
// Context canceled, stop processing
// Decrement since we added above but didn't actually send
// Decrement both: the retry that won't be sent and the
// original post we are abandoning.
postsInFlight.Done()
slog.WarnContext(ctx, "Context canceled while trying to send retry", "file", post.FilePath)
return
default:
// Channel is closed or full, cannot retry
// Decrement since we added above but didn't actually send
postsInFlight.Done()
// Treat this as a failure
slog.WarnContext(ctx, "Cannot send retry - postQueue unavailable", "file", post.FilePath)
post.mu.Lock()
post.Status = PostStatusFailed
post.Error = fmt.Errorf("failed to queue retry - postQueue closed")
post.mu.Unlock()

if post.failed != nil {
*post.failed++
}

errChan <- fmt.Errorf("failed to queue retry for file %s", post.FilePath)
slog.WarnContext(ctx, "Context canceled while trying to send retry", "file", post.FilePath)
return
}
}
Expand Down Expand Up @@ -725,7 +742,13 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue
postsInFlight.Done()

if post.failed != nil {
*post.failed++
post.failed.Add(1)
}

if post.file != nil {
if cerr := post.file.Close(); cerr != nil {
slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", cerr, "file", post.FilePath)
}
}

errChan <- fmt.Errorf("failed to verify file %s after %d retries", post.FilePath, p.checkCfg.MaxRePost)
Expand All @@ -741,7 +764,13 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue
postsInFlight.Done()

if post.failed != nil {
*post.failed++
post.failed.Add(1)
}

if post.file != nil {
if cerr := post.file.Close(); cerr != nil {
slog.WarnContext(ctx, "Error closing file handle on verify failure", "error", cerr, "file", post.FilePath)
}
}

errChan <- fmt.Errorf("unexpected error verifying file %s: %v", post.FilePath, errors)
Expand Down Expand Up @@ -770,22 +799,27 @@ func (p *poster) checkLoop(ctx context.Context, checkQueue chan *Post, postQueue
}

// After processing all posts, if there are deferred articles, send a DeferredCheckError
// This is a non-fatal error that signals the caller to store these for later verification
// This is a non-fatal error that signals the caller to store these for later verification.
// Guard the send with ctx so a leaked checkLoop cannot block forever if the main
// goroutine has already returned.
if len(allDeferredArticles) > 0 {
slog.InfoContext(ctx, "Sending deferred check error",
"deferred_articles", len(allDeferredArticles),
"total_articles", totalArticlesProcessed)
errChan <- &DeferredCheckError{
select {
case errChan <- &DeferredCheckError{
FailedArticles: allDeferredArticles,
TotalArticles: totalArticlesProcessed,
}:
case <-ctx.Done():
}
}
}

// addPost adds a file to the posting queue
// displayName is the name to use in the subject (e.g., "Folder/subfolder/file.mp4")
// If displayName is empty, the filename is used
func (p *poster) addPost(ctx context.Context, filePath string, displayName string, fileNumber int, totalFiles int, wg *sync.WaitGroup, failedPosts *int, postQueue chan<- *Post, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) error {
func (p *poster) addPost(ctx context.Context, filePath string, displayName string, fileNumber int, totalFiles int, wg *sync.WaitGroup, failedPosts *atomic.Int64, postQueue chan<- *Post, nzbGen nzb.NZBGenerator, postsInFlight *sync.WaitGroup) error {
file, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("error opening file: %w", err)
Expand Down
5 changes: 3 additions & 2 deletions internal/poster/poster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
Expand Down Expand Up @@ -825,7 +826,7 @@ func TestAddPost(t *testing.T) {

var wg sync.WaitGroup
var postsInFlight sync.WaitGroup
failedPosts := 0
var failedPosts atomic.Int64
postQueue := make(chan *Post, 10)
nzbGen := mocks.NewMockNZBGenerator(ctrl)

Expand Down Expand Up @@ -863,7 +864,7 @@ func TestAddPost(t *testing.T) {

var wg sync.WaitGroup
var postsInFlight sync.WaitGroup
failedPosts := 0
var failedPosts atomic.Int64
postQueue := make(chan *Post, 10)
nzbGen := mocks.NewMockNZBGenerator(ctrl)

Expand Down
Loading