From eb568cf8f7de6a34a4c21d3de0992a2ff055f118 Mon Sep 17 00:00:00 2001 From: Wael Nasreddine Date: Wed, 11 Feb 2026 20:59:49 -0800 Subject: [PATCH] feat: add zstd compression to s3 chunk store Chunks stored in S3 can be quite large, and many of them are highly compressible. Adding compression reduces storage costs and improves network performance by transferring fewer bytes. This change: - Implements zstd compression in PutChunk for S3 storage. - Returns the compressed size from PutChunk to allow tracking actual storage usage. - Automatically decompresses chunks in GetChunk using a wrapped ReadCloser. - Adds integration tests to verify both compression effectiveness and round-trip correctness. --- pkg/storage/chunk/s3.go | 69 +++++++++++++++++++++++++++++------- pkg/storage/chunk/s3_test.go | 64 ++++++++++++++++++++++++++------- 2 files changed, 108 insertions(+), 25 deletions(-) diff --git a/pkg/storage/chunk/s3.go b/pkg/storage/chunk/s3.go index 6515087a..9871dc39 100644 --- a/pkg/storage/chunk/s3.go +++ b/pkg/storage/chunk/s3.go @@ -10,6 +10,7 @@ import ( "path" "time" + "github.com/klauspost/compress/zstd" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" @@ -27,11 +28,25 @@ const ( chunkPutLockTTL = 5 * time.Minute ) +// s3ReadCloser wraps a zstd decoder and io.ReadCloser to properly close both. +type s3ReadCloser struct { + *zstd.Decoder + body io.ReadCloser +} + +func (r *s3ReadCloser) Close() error { + r.Decoder.Close() + + return r.body.Close() +} + // s3Store implements Store for S3 storage. type s3Store struct { - client *minio.Client - locker lock.Locker - bucket string + client *minio.Client + locker lock.Locker + bucket string + encoder *zstd.Encoder + decoder *zstd.Decoder } // NewS3Store returns a new S3 chunk store. @@ -73,10 +88,24 @@ func NewS3Store(ctx context.Context, cfg s3.Config, locker lock.Locker) (Store, return nil, fmt.Errorf("%w: %s", ErrBucketNotFound, cfg.Bucket) } + encoder, err := zstd.NewWriter(nil) + if err != nil { + return nil, fmt.Errorf("failed to create zstd encoder: %w", err) + } + + decoder, err := zstd.NewReader(nil) + if err != nil { + encoder.Close() + + return nil, fmt.Errorf("failed to create zstd decoder: %w", err) + } + return &s3Store{ - client: client, - locker: locker, - bucket: cfg.Bucket, + client: client, + locker: locker, + bucket: cfg.Bucket, + encoder: encoder, + decoder: decoder, }, nil } @@ -118,7 +147,15 @@ func (s *s3Store) GetChunk(ctx context.Context, hash string) (io.ReadCloser, err return nil, err } - return obj, nil + // Create a new decoder for this specific object + decoder, err := zstd.NewReader(obj) + if err != nil { + obj.Close() + + return nil, fmt.Errorf("failed to create zstd decoder: %w", err) + } + + return &s3ReadCloser{decoder, obj}, nil } func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, int64, error) { @@ -135,6 +172,9 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, _ = s.locker.Unlock(ctx, lockKey) }() + // Compress data with zstd + compressed := s.encoder.EncodeAll(data, nil) + // Check if exists. exists, err := s.HasChunk(ctx, hash) if err != nil { @@ -142,17 +182,22 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, } if exists { - return false, int64(len(data)), nil + return false, int64(len(compressed)), nil } - _, err = s.client.PutObject(ctx, s.bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{ - ContentType: "application/octet-stream", - }) + _, err = s.client.PutObject( + ctx, + s.bucket, + key, + bytes.NewReader(compressed), + int64(len(compressed)), + minio.PutObjectOptions{ContentType: "application/octet-stream"}, + ) if err != nil { return false, 0, fmt.Errorf("error putting chunk to S3: %w", err) } - return true, int64(len(data)), nil + return true, int64(len(compressed)), nil } func (s *s3Store) DeleteChunk(ctx context.Context, hash string) error { diff --git a/pkg/storage/chunk/s3_test.go b/pkg/storage/chunk/s3_test.go index a715fb9f..615dfc93 100644 --- a/pkg/storage/chunk/s3_test.go +++ b/pkg/storage/chunk/s3_test.go @@ -1,9 +1,11 @@ package chunk_test import ( + "bytes" "context" "errors" "io" + "strings" "testing" "time" @@ -34,12 +36,12 @@ func TestS3Store_Integration(t *testing.T) { t.Parallel() hash := "test-hash-s3-1" - content := "s3 chunk content" + content := strings.Repeat("s3 chunk content", 1024) created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) - assert.Equal(t, int64(len(content)), size) + assert.Greater(t, int64(len(content)), size) defer func() { err := store.DeleteChunk(ctx, hash) @@ -64,12 +66,12 @@ func TestS3Store_Integration(t *testing.T) { t.Parallel() hash := "test-hash-s3-2" - content := "s3 chunk content" + content := strings.Repeat("s3 chunk content", 1024) created1, size1, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created1) - assert.Equal(t, int64(len(content)), size1) + assert.Greater(t, int64(len(content)), size1) defer func() { err := store.DeleteChunk(ctx, hash) @@ -79,7 +81,7 @@ func TestS3Store_Integration(t *testing.T) { created2, size2, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.False(t, created2) - assert.Equal(t, int64(len(content)), size2) + assert.Greater(t, int64(len(content)), size2) }) t.Run("get non-existent chunk", func(t *testing.T) { @@ -94,7 +96,7 @@ func TestS3Store_Integration(t *testing.T) { t.Parallel() hash := "test-hash-s3-idempotency" - content := "s3 chunk content idempotency" + content := strings.Repeat("s3 chunk content idempotency", 1024) // Delete non-existent chunk should not return error err := store.DeleteChunk(ctx, hash) @@ -103,7 +105,7 @@ func TestS3Store_Integration(t *testing.T) { created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) - assert.Equal(t, int64(len(content)), size) + assert.Greater(t, int64(len(content)), size) err = store.DeleteChunk(ctx, hash) require.NoError(t, err) @@ -111,6 +113,42 @@ func TestS3Store_Integration(t *testing.T) { err = store.DeleteChunk(ctx, hash) require.NoError(t, err) }) + + t.Run("stored chunk is zstd-compressed in S3", func(t *testing.T) { + t.Parallel() + + data := bytes.Repeat([]byte("compressible"), 1024) + isNew, compressedSize, err := store.PutChunk(ctx, "test-hash-s3-compress", data) + require.NoError(t, err) + assert.True(t, isNew) + assert.Greater(t, int64(len(data)), compressedSize, "compressed size should be less than original") + assert.Positive(t, compressedSize) + + defer func() { + _ = store.DeleteChunk(ctx, "test-hash-s3-compress") + }() + }) + + t.Run("compressed chunk round-trips correctly via S3", func(t *testing.T) { + t.Parallel() + + data := []byte("hello from S3 compressed chunk! hello from S3 compressed chunk!") + _, _, err := store.PutChunk(ctx, "test-hash-s3-roundtrip", data) + require.NoError(t, err) + + defer func() { + _ = store.DeleteChunk(ctx, "test-hash-s3-roundtrip") + }() + + rc, err := store.GetChunk(ctx, "test-hash-s3-roundtrip") + require.NoError(t, err) + + defer rc.Close() + + got, err := io.ReadAll(rc) + require.NoError(t, err) + assert.Equal(t, data, got) + }) } func TestS3Store_PutChunk_RaceCondition(t *testing.T) { @@ -129,7 +167,7 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) { require.NoError(t, err) hash := "test-hash-race" - content := []byte("race condition content") + content := []byte(strings.Repeat("race condition content", 1024)) defer func() { _ = store.DeleteChunk(ctx, hash) @@ -140,12 +178,12 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) { results := make(chan bool, numGoRoutines) errors := make(chan error, numGoRoutines) - for i := 0; i < numGoRoutines; i++ { + for range numGoRoutines { go func() { created, size, err := store.PutChunk(ctx, hash, content) results <- created - assert.Equal(t, int64(len(content)), size) + assert.Greater(t, int64(len(content)), size) errors <- err }() @@ -153,7 +191,7 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) { createdCount := 0 - for i := 0; i < numGoRoutines; i++ { + for range numGoRoutines { err := <-errors require.NoError(t, err) @@ -234,12 +272,12 @@ func TestS3Store_ChunkPath(t *testing.T) { t.Run("short hash", func(t *testing.T) { hash := "a" - content := "short hash content" + content := strings.Repeat("short hash content", 1024) created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) - assert.Equal(t, int64(len(content)), size) + assert.Greater(t, int64(len(content)), size) defer func() { _ = store.DeleteChunk(ctx, hash)