diff --git a/pkg/storage/chunk/s3.go b/pkg/storage/chunk/s3.go index 6515087a..9871dc39 100644 --- a/pkg/storage/chunk/s3.go +++ b/pkg/storage/chunk/s3.go @@ -10,6 +10,7 @@ import ( "path" "time" + "github.com/klauspost/compress/zstd" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" @@ -27,11 +28,25 @@ const ( chunkPutLockTTL = 5 * time.Minute ) +// s3ReadCloser wraps a zstd decoder and io.ReadCloser to properly close both. +type s3ReadCloser struct { + *zstd.Decoder + body io.ReadCloser +} + +func (r *s3ReadCloser) Close() error { + r.Decoder.Close() + + return r.body.Close() +} + // s3Store implements Store for S3 storage. type s3Store struct { - client *minio.Client - locker lock.Locker - bucket string + client *minio.Client + locker lock.Locker + bucket string + encoder *zstd.Encoder + decoder *zstd.Decoder } // NewS3Store returns a new S3 chunk store. @@ -73,10 +88,24 @@ func NewS3Store(ctx context.Context, cfg s3.Config, locker lock.Locker) (Store, return nil, fmt.Errorf("%w: %s", ErrBucketNotFound, cfg.Bucket) } + encoder, err := zstd.NewWriter(nil) + if err != nil { + return nil, fmt.Errorf("failed to create zstd encoder: %w", err) + } + + decoder, err := zstd.NewReader(nil) + if err != nil { + encoder.Close() + + return nil, fmt.Errorf("failed to create zstd decoder: %w", err) + } + return &s3Store{ - client: client, - locker: locker, - bucket: cfg.Bucket, + client: client, + locker: locker, + bucket: cfg.Bucket, + encoder: encoder, + decoder: decoder, }, nil } @@ -118,7 +147,15 @@ func (s *s3Store) GetChunk(ctx context.Context, hash string) (io.ReadCloser, err return nil, err } - return obj, nil + // Create a new decoder for this specific object + decoder, err := zstd.NewReader(obj) + if err != nil { + obj.Close() + + return nil, fmt.Errorf("failed to create zstd decoder: %w", err) + } + + return &s3ReadCloser{decoder, obj}, nil } func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, int64, error) { @@ -135,6 +172,9 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, _ = s.locker.Unlock(ctx, lockKey) }() + // Compress data with zstd + compressed := s.encoder.EncodeAll(data, nil) + // Check if exists. exists, err := s.HasChunk(ctx, hash) if err != nil { @@ -142,17 +182,22 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, } if exists { - return false, int64(len(data)), nil + return false, int64(len(compressed)), nil } - _, err = s.client.PutObject(ctx, s.bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{ - ContentType: "application/octet-stream", - }) + _, err = s.client.PutObject( + ctx, + s.bucket, + key, + bytes.NewReader(compressed), + int64(len(compressed)), + minio.PutObjectOptions{ContentType: "application/octet-stream"}, + ) if err != nil { return false, 0, fmt.Errorf("error putting chunk to S3: %w", err) } - return true, int64(len(data)), nil + return true, int64(len(compressed)), nil } func (s *s3Store) DeleteChunk(ctx context.Context, hash string) error { diff --git a/pkg/storage/chunk/s3_test.go b/pkg/storage/chunk/s3_test.go index a715fb9f..615dfc93 100644 --- a/pkg/storage/chunk/s3_test.go +++ b/pkg/storage/chunk/s3_test.go @@ -1,9 +1,11 @@ package chunk_test import ( + "bytes" "context" "errors" "io" + "strings" "testing" "time" @@ -34,12 +36,12 @@ func TestS3Store_Integration(t *testing.T) { t.Parallel() hash := "test-hash-s3-1" - content := "s3 chunk content" + content := strings.Repeat("s3 chunk content", 1024) created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) - assert.Equal(t, int64(len(content)), size) + assert.Greater(t, int64(len(content)), size) defer func() { err := store.DeleteChunk(ctx, hash) @@ -64,12 +66,12 @@ func TestS3Store_Integration(t *testing.T) { t.Parallel() hash := "test-hash-s3-2" - content := "s3 chunk content" + content := strings.Repeat("s3 chunk content", 1024) created1, size1, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created1) - assert.Equal(t, int64(len(content)), size1) + assert.Greater(t, int64(len(content)), size1) defer func() { err := store.DeleteChunk(ctx, hash) @@ -79,7 +81,7 @@ func TestS3Store_Integration(t *testing.T) { created2, size2, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.False(t, created2) - assert.Equal(t, int64(len(content)), size2) + assert.Greater(t, int64(len(content)), size2) }) t.Run("get non-existent chunk", func(t *testing.T) { @@ -94,7 +96,7 @@ func TestS3Store_Integration(t *testing.T) { t.Parallel() hash := "test-hash-s3-idempotency" - content := "s3 chunk content idempotency" + content := strings.Repeat("s3 chunk content idempotency", 1024) // Delete non-existent chunk should not return error err := store.DeleteChunk(ctx, hash) @@ -103,7 +105,7 @@ func TestS3Store_Integration(t *testing.T) { created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) - assert.Equal(t, int64(len(content)), size) + assert.Greater(t, int64(len(content)), size) err = store.DeleteChunk(ctx, hash) require.NoError(t, err) @@ -111,6 +113,42 @@ func TestS3Store_Integration(t *testing.T) { err = store.DeleteChunk(ctx, hash) require.NoError(t, err) }) + + t.Run("stored chunk is zstd-compressed in S3", func(t *testing.T) { + t.Parallel() + + data := bytes.Repeat([]byte("compressible"), 1024) + isNew, compressedSize, err := store.PutChunk(ctx, "test-hash-s3-compress", data) + require.NoError(t, err) + assert.True(t, isNew) + assert.Greater(t, int64(len(data)), compressedSize, "compressed size should be less than original") + assert.Positive(t, compressedSize) + + defer func() { + _ = store.DeleteChunk(ctx, "test-hash-s3-compress") + }() + }) + + t.Run("compressed chunk round-trips correctly via S3", func(t *testing.T) { + t.Parallel() + + data := []byte("hello from S3 compressed chunk! hello from S3 compressed chunk!") + _, _, err := store.PutChunk(ctx, "test-hash-s3-roundtrip", data) + require.NoError(t, err) + + defer func() { + _ = store.DeleteChunk(ctx, "test-hash-s3-roundtrip") + }() + + rc, err := store.GetChunk(ctx, "test-hash-s3-roundtrip") + require.NoError(t, err) + + defer rc.Close() + + got, err := io.ReadAll(rc) + require.NoError(t, err) + assert.Equal(t, data, got) + }) } func TestS3Store_PutChunk_RaceCondition(t *testing.T) { @@ -129,7 +167,7 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) { require.NoError(t, err) hash := "test-hash-race" - content := []byte("race condition content") + content := []byte(strings.Repeat("race condition content", 1024)) defer func() { _ = store.DeleteChunk(ctx, hash) @@ -140,12 +178,12 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) { results := make(chan bool, numGoRoutines) errors := make(chan error, numGoRoutines) - for i := 0; i < numGoRoutines; i++ { + for range numGoRoutines { go func() { created, size, err := store.PutChunk(ctx, hash, content) results <- created - assert.Equal(t, int64(len(content)), size) + assert.Greater(t, int64(len(content)), size) errors <- err }() @@ -153,7 +191,7 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) { createdCount := 0 - for i := 0; i < numGoRoutines; i++ { + for range numGoRoutines { err := <-errors require.NoError(t, err) @@ -234,12 +272,12 @@ func TestS3Store_ChunkPath(t *testing.T) { t.Run("short hash", func(t *testing.T) { hash := "a" - content := "short hash content" + content := strings.Repeat("short hash content", 1024) created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) - assert.Equal(t, int64(len(content)), size) + assert.Greater(t, int64(len(content)), size) defer func() { _ = store.DeleteChunk(ctx, hash)