Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions internal/cache/s3.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"bufio"
"context"
"crypto/tls"
"encoding/json"
Expand Down Expand Up @@ -350,8 +351,14 @@ func (s *S3) Create(ctx context.Context, key Key, headers http.Header, ttl time.
errCh: make(chan error, 1),
}

// Start upload in background goroutine
go writer.upload(pr)
// Start upload in background goroutine. The buffered reader decouples the
// upstream pipe (zero-buffer io.Pipe from the archive process) from the
// upload chunking loop. Without it, when the uploader blocks on a full
// jobs channel or slow S3 part upload, the archive goroutine stalls
// because nobody is consuming the pipe. The 8 MiB buffer absorbs ongoing
// archive output during those brief stalls.
br := bufio.NewReaderSize(pr, 8<<20)
go writer.upload(pr, br)

return writer, nil
}
Expand Down Expand Up @@ -422,7 +429,7 @@ func (w *s3Writer) Close() error {
return nil
}

func (w *s3Writer) upload(pr *io.PipeReader) {
func (w *s3Writer) upload(pr *io.PipeReader, r io.Reader) {
var uploadErr error
defer func() {
// Use CloseWithError to propagate any error to the writer side
Expand Down Expand Up @@ -471,7 +478,7 @@ func (w *s3Writer) upload(pr *io.PipeReader) {
w.ctx,
w.s3.config.Bucket,
objectName,
pr,
r,
-1,
opts,
)
Expand Down