From 8662246b64ef19fd394a065a0587951c9dc85da9 Mon Sep 17 00:00:00 2001 From: Josh Friend Date: Wed, 25 Mar 2026 16:57:26 -0400 Subject: [PATCH] perf: parallel range-GET S3 downloads for large objects Replace the single GetObject stream in S3.Open with parallel range-GET requests for objects larger than 32 MiB. Workers download chunks concurrently via errgroup and reassemble them in order via io.Pipe. All chunk requests are pinned to the ETag from the initial stat to prevent corruption if the key is overwritten mid-read. An errgroup with derived context ensures all workers are cancelled promptly on any error or early consumer close. --- go.mod | 2 +- internal/cache/s3.go | 8 +- internal/cache/s3_parallel_get.go | 143 ++++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+), 5 deletions(-) create mode 100644 internal/cache/s3_parallel_get.go diff --git a/go.mod b/go.mod index 082a180e..c5c76b72 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( go.opentelemetry.io/otel/sdk v1.41.0 go.opentelemetry.io/otel/sdk/metric v1.41.0 golang.org/x/mod v0.33.0 + golang.org/x/sync v0.20.0 ) require ( @@ -76,7 +77,6 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.48.0 // indirect golang.org/x/net v0.51.0 // indirect - golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.34.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect diff --git a/internal/cache/s3.go b/internal/cache/s3.go index 6b5ae913..7c9a4308 100644 --- a/internal/cache/s3.go +++ b/internal/cache/s3.go @@ -263,13 +263,13 @@ func (s *S3) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, err } } - // Get object - obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, minio.GetObjectOptions{}) + // Download object using parallel range-GET for large objects. + reader, err := s.parallelGetReader(ctx, s.config.Bucket, objectName, objInfo.Size, objInfo.ETag) if err != nil { - return nil, nil, errors.Errorf("failed to get object: %w", err) + return nil, nil, err } - return &s3Reader{obj: obj}, headers, nil + return reader, headers, nil } // refreshExpiration updates the Expires-At metadata on an S3 object using diff --git a/internal/cache/s3_parallel_get.go b/internal/cache/s3_parallel_get.go new file mode 100644 index 00000000..6c2b9e3b --- /dev/null +++ b/internal/cache/s3_parallel_get.go @@ -0,0 +1,143 @@ +package cache + +import ( + "context" + "io" + + "github.com/alecthomas/errors" + "github.com/minio/minio-go/v7" + "golang.org/x/sync/errgroup" +) + +const ( + // s3DownloadChunkSize is the size of each parallel range-GET request. + // 32 MiB matches the gradle-cache-tool's benchmarked default. + s3DownloadChunkSize = 32 << 20 + // s3DownloadWorkers is the number of concurrent range-GET requests. + // 8 workers should be enough to saturate the host's network connection. + s3DownloadWorkers = 8 +) + +// parallelGetReader returns an io.ReadCloser that downloads the S3 object +// using parallel range-GET requests and reassembles chunks in order. +// For objects smaller than one chunk, it falls back to a single GetObject. +// The etag pins all chunk requests to one object revision, preventing +// corruption if the key is overwritten during a large read. +func (s *S3) parallelGetReader(ctx context.Context, bucket, objectName string, size int64, etag string) (io.ReadCloser, error) { + if size <= s3DownloadChunkSize { + // Small object: single stream. + obj, err := s.client.GetObject(ctx, bucket, objectName, minio.GetObjectOptions{}) + if err != nil { + return nil, errors.Errorf("failed to get object: %w", err) + } + return &s3Reader{obj: obj}, nil + } + + // Large object: parallel range requests reassembled in order via io.Pipe. + // Use a cancellable context so workers stop promptly if the consumer + // disconnects or a write error occurs. + dlCtx, cancel := context.WithCancel(ctx) + pr, pw := io.Pipe() + go func() { + err := s.parallelGet(dlCtx, bucket, objectName, size, etag, pw) + cancel() + pw.CloseWithError(err) + }() + return &cancelReadCloser{ReadCloser: pr, cancel: cancel}, nil +} + +// cancelReadCloser wraps an io.ReadCloser and cancels a context on Close, +// ensuring background goroutines are cleaned up when the consumer is done. +type cancelReadCloser struct { + io.ReadCloser + cancel context.CancelFunc +} + +func (c *cancelReadCloser) Close() error { + c.cancel() + return errors.Wrap(c.ReadCloser.Close(), "close parallel get reader") +} + +// parallelGet downloads an S3 object in parallel chunks and writes them in +// order to w. Each worker downloads its chunk into memory so the TCP +// connection stays active at full speed. Peak memory: numWorkers × chunkSize. +// All chunk requests are pinned to the given etag to ensure consistency. +// An errgroup cancels all workers on the first error from any goroutine. +func (s *S3) parallelGet(ctx context.Context, bucket, objectName string, size int64, etag string, w io.Writer) error { + numChunks := int((size + s3DownloadChunkSize - 1) / s3DownloadChunkSize) + numWorkers := min(s3DownloadWorkers, numChunks) + + // One buffered channel per chunk so workers never block after sending. + results := make([]chan []byte, numChunks) + for i := range results { + results[i] = make(chan []byte, 1) + } + + // Work queue of chunk indices. + work := make(chan int, numChunks) + for i := range numChunks { + work <- i + } + close(work) + + eg, egCtx := errgroup.WithContext(ctx) + + // Download workers: fetch chunks concurrently and send data on success, + // or return an error which cancels all other workers via egCtx. + for range numWorkers { + eg.Go(func() error { + for seq := range work { + if egCtx.Err() != nil { + return egCtx.Err() + } + + start := int64(seq) * s3DownloadChunkSize + end := min(start+s3DownloadChunkSize-1, size-1) + + opts := minio.GetObjectOptions{} + if err := opts.SetRange(start, end); err != nil { + return errors.Errorf("set range %d-%d: %w", start, end, err) + } + // Pin to the object revision from the initial stat to prevent + // reading a mix of old and new data if the key is overwritten. + if err := opts.SetMatchETag(etag); err != nil { + return errors.Errorf("set etag %s: %w", etag, err) + } + + obj, err := s.client.GetObject(egCtx, bucket, objectName, opts) + if err != nil { + return errors.Errorf("get range %d-%d: %w", start, end, err) + } + + // Drain the body immediately so the TCP connection stays at + // full speed. All workers do this concurrently, saturating + // the available S3 bandwidth. + data, readErr := io.ReadAll(obj) + obj.Close() //nolint:errcheck,gosec + if readErr != nil { + return errors.Wrap(readErr, "read chunk") + } + results[seq] <- data + } + return nil + }) + } + + // Write chunks in order. Runs in the errgroup so that a write error + // cancels egCtx, which stops download workers promptly. + eg.Go(func() error { + for _, ch := range results { + select { + case data := <-ch: + if _, err := w.Write(data); err != nil { + return errors.Wrap(err, "write chunk") + } + case <-egCtx.Done(): + return egCtx.Err() + } + } + return nil + }) + + return errors.Wrap(eg.Wait(), "parallel get") +}