Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
go.opentelemetry.io/otel/sdk v1.41.0
go.opentelemetry.io/otel/sdk/metric v1.41.0
golang.org/x/mod v0.33.0
golang.org/x/sync v0.20.0
)

require (
Expand Down Expand Up @@ -76,7 +77,6 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/net v0.51.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect
Expand Down
8 changes: 4 additions & 4 deletions internal/cache/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,13 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err
}
}

// Get object
obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, minio.GetObjectOptions{})
// Download object using parallel range-GET for large objects.
reader, err := s.parallelGetReader(ctx, s.config.Bucket, objectName, objInfo.Size, objInfo.ETag)
if err != nil {
return nil, nil, errors.Errorf("failed to get object: %w", err)
return nil, nil, err
}

return &s3Reader{obj: obj}, headers, nil
return reader, headers, nil
}

// refreshExpiration updates the Expires-At metadata on an S3 object using
Expand Down
143 changes: 143 additions & 0 deletions internal/cache/s3_parallel_get.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package cache

import (
"context"
"io"

"github.com/alecthomas/errors"
"github.com/minio/minio-go/v7"
"golang.org/x/sync/errgroup"
)

const (
// s3DownloadChunkSize is the size of each parallel range-GET request.
// 32 MiB matches the gradle-cache-tool's benchmarked default.
s3DownloadChunkSize = 32 << 20
// s3DownloadWorkers is the number of concurrent range-GET requests.
// 8 workers should be enough to saturate the host's network connection.
s3DownloadWorkers = 8
)

// parallelGetReader returns an io.ReadCloser that downloads the S3 object
// using parallel range-GET requests and reassembles chunks in order.
// For objects smaller than one chunk, it falls back to a single GetObject.
// The etag pins all chunk requests to one object revision, preventing
// corruption if the key is overwritten during a large read.
func (s *S3) parallelGetReader(ctx context.Context, bucket, objectName string, size int64, etag string) (io.ReadCloser, error) {
if size <= s3DownloadChunkSize {
// Small object: single stream.
obj, err := s.client.GetObject(ctx, bucket, objectName, minio.GetObjectOptions{})
if err != nil {
return nil, errors.Errorf("failed to get object: %w", err)
}
return &s3Reader{obj: obj}, nil
}

// Large object: parallel range requests reassembled in order via io.Pipe.
// Use a cancellable context so workers stop promptly if the consumer
// disconnects or a write error occurs.
dlCtx, cancel := context.WithCancel(ctx)
pr, pw := io.Pipe()
go func() {
err := s.parallelGet(dlCtx, bucket, objectName, size, etag, pw)
cancel()
pw.CloseWithError(err)
}()
return &cancelReadCloser{ReadCloser: pr, cancel: cancel}, nil
}

// cancelReadCloser wraps an io.ReadCloser and cancels a context on Close,
// ensuring background goroutines are cleaned up when the consumer is done.
type cancelReadCloser struct {
io.ReadCloser
cancel context.CancelFunc
}

func (c *cancelReadCloser) Close() error {
c.cancel()
return errors.Wrap(c.ReadCloser.Close(), "close parallel get reader")
}

// parallelGet downloads an S3 object in parallel chunks and writes them in
// order to w. Each worker downloads its chunk into memory so the TCP
// connection stays active at full speed. Peak memory: numWorkers × chunkSize.
// All chunk requests are pinned to the given etag to ensure consistency.
// An errgroup cancels all workers on the first error from any goroutine.
func (s *S3) parallelGet(ctx context.Context, bucket, objectName string, size int64, etag string, w io.Writer) error {
numChunks := int((size + s3DownloadChunkSize - 1) / s3DownloadChunkSize)
numWorkers := min(s3DownloadWorkers, numChunks)

// One buffered channel per chunk so workers never block after sending.
results := make([]chan []byte, numChunks)
for i := range results {
results[i] = make(chan []byte, 1)
}

// Work queue of chunk indices.
work := make(chan int, numChunks)
for i := range numChunks {
work <- i
}
close(work)

eg, egCtx := errgroup.WithContext(ctx)

// Download workers: fetch chunks concurrently and send data on success,
// or return an error which cancels all other workers via egCtx.
for range numWorkers {
eg.Go(func() error {
for seq := range work {
if egCtx.Err() != nil {
return egCtx.Err()
}

start := int64(seq) * s3DownloadChunkSize
end := min(start+s3DownloadChunkSize-1, size-1)

opts := minio.GetObjectOptions{}
if err := opts.SetRange(start, end); err != nil {
return errors.Errorf("set range %d-%d: %w", start, end, err)
}
// Pin to the object revision from the initial stat to prevent
// reading a mix of old and new data if the key is overwritten.
if err := opts.SetMatchETag(etag); err != nil {
return errors.Errorf("set etag %s: %w", etag, err)
}

obj, err := s.client.GetObject(egCtx, bucket, objectName, opts)
if err != nil {
return errors.Errorf("get range %d-%d: %w", start, end, err)
}

// Drain the body immediately so the TCP connection stays at
// full speed. All workers do this concurrently, saturating
// the available S3 bandwidth.
data, readErr := io.ReadAll(obj)
obj.Close() //nolint:errcheck,gosec
if readErr != nil {
return errors.Wrap(readErr, "read chunk")
}
results[seq] <- data
}
return nil
})
}

// Write chunks in order. Runs in the errgroup so that a write error
// cancels egCtx, which stops download workers promptly.
eg.Go(func() error {
for _, ch := range results {
select {
case data := <-ch:
if _, err := w.Write(data); err != nil {
return errors.Wrap(err, "write chunk")
}
case <-egCtx.Done():
return egCtx.Err()
}
}
return nil
})

return errors.Wrap(eg.Wait(), "parallel get")
}