Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions pkg/storage/chunk/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"time"

"github.com/klauspost/compress/zstd"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"

Expand All @@ -27,11 +28,25 @@ const (
chunkPutLockTTL = 5 * time.Minute
)

// s3ReadCloser wraps a zstd decoder and io.ReadCloser to properly close both.
type s3ReadCloser struct {
*zstd.Decoder
body io.ReadCloser
}

func (r *s3ReadCloser) Close() error {
r.Decoder.Close()

return r.body.Close()
}

// s3Store implements Store for S3 storage.
type s3Store struct {
client *minio.Client
locker lock.Locker
bucket string
client *minio.Client
locker lock.Locker
bucket string
encoder *zstd.Encoder
decoder *zstd.Decoder
}

// NewS3Store returns a new S3 chunk store.
Expand Down Expand Up @@ -73,10 +88,24 @@ func NewS3Store(ctx context.Context, cfg s3.Config, locker lock.Locker) (Store,
return nil, fmt.Errorf("%w: %s", ErrBucketNotFound, cfg.Bucket)
}

encoder, err := zstd.NewWriter(nil)
if err != nil {
return nil, fmt.Errorf("failed to create zstd encoder: %w", err)
}

decoder, err := zstd.NewReader(nil)
if err != nil {
encoder.Close()

return nil, fmt.Errorf("failed to create zstd decoder: %w", err)
}

return &s3Store{
client: client,
locker: locker,
bucket: cfg.Bucket,
client: client,
locker: locker,
bucket: cfg.Bucket,
encoder: encoder,
decoder: decoder,
}, nil
}

Expand Down Expand Up @@ -118,7 +147,15 @@ func (s *s3Store) GetChunk(ctx context.Context, hash string) (io.ReadCloser, err
return nil, err
}

return obj, nil
// Create a new decoder for this specific object
decoder, err := zstd.NewReader(obj)
if err != nil {
obj.Close()

return nil, fmt.Errorf("failed to create zstd decoder: %w", err)
}

return &s3ReadCloser{decoder, obj}, nil
}

func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, int64, error) {
Expand All @@ -135,24 +172,32 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool,
_ = s.locker.Unlock(ctx, lockKey)
}()

// Compress data with zstd
compressed := s.encoder.EncodeAll(data, nil)

// Check if exists.
exists, err := s.HasChunk(ctx, hash)
if err != nil {
return false, 0, err
}

if exists {
return false, int64(len(data)), nil
return false, int64(len(compressed)), nil
}

_, err = s.client.PutObject(ctx, s.bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
ContentType: "application/octet-stream",
})
_, err = s.client.PutObject(
ctx,
s.bucket,
key,
bytes.NewReader(compressed),
int64(len(compressed)),
minio.PutObjectOptions{ContentType: "application/octet-stream"},
)
if err != nil {
return false, 0, fmt.Errorf("error putting chunk to S3: %w", err)
}

return true, int64(len(data)), nil
return true, int64(len(compressed)), nil
}

func (s *s3Store) DeleteChunk(ctx context.Context, hash string) error {
Expand Down
64 changes: 51 additions & 13 deletions pkg/storage/chunk/s3_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package chunk_test

import (
"bytes"
"context"
"errors"
"io"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -34,12 +36,12 @@ func TestS3Store_Integration(t *testing.T) {
t.Parallel()

hash := "test-hash-s3-1"
content := "s3 chunk content"
content := strings.Repeat("s3 chunk content", 1024)

created, size, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created)
assert.Equal(t, int64(len(content)), size)
assert.Greater(t, int64(len(content)), size)

defer func() {
err := store.DeleteChunk(ctx, hash)
Expand All @@ -64,12 +66,12 @@ func TestS3Store_Integration(t *testing.T) {
t.Parallel()

hash := "test-hash-s3-2"
content := "s3 chunk content"
content := strings.Repeat("s3 chunk content", 1024)

created1, size1, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created1)
assert.Equal(t, int64(len(content)), size1)
assert.Greater(t, int64(len(content)), size1)

defer func() {
err := store.DeleteChunk(ctx, hash)
Expand All @@ -79,7 +81,7 @@ func TestS3Store_Integration(t *testing.T) {
created2, size2, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.False(t, created2)
assert.Equal(t, int64(len(content)), size2)
assert.Greater(t, int64(len(content)), size2)
})

t.Run("get non-existent chunk", func(t *testing.T) {
Expand All @@ -94,7 +96,7 @@ func TestS3Store_Integration(t *testing.T) {
t.Parallel()

hash := "test-hash-s3-idempotency"
content := "s3 chunk content idempotency"
content := strings.Repeat("s3 chunk content idempotency", 1024)

// Delete non-existent chunk should not return error
err := store.DeleteChunk(ctx, hash)
Expand All @@ -103,14 +105,50 @@ func TestS3Store_Integration(t *testing.T) {
created, size, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created)
assert.Equal(t, int64(len(content)), size)
assert.Greater(t, int64(len(content)), size)

err = store.DeleteChunk(ctx, hash)
require.NoError(t, err)

err = store.DeleteChunk(ctx, hash)
require.NoError(t, err)
})

t.Run("stored chunk is zstd-compressed in S3", func(t *testing.T) {
t.Parallel()

data := bytes.Repeat([]byte("compressible"), 1024)
isNew, compressedSize, err := store.PutChunk(ctx, "test-hash-s3-compress", data)
require.NoError(t, err)
assert.True(t, isNew)
assert.Greater(t, int64(len(data)), compressedSize, "compressed size should be less than original")
assert.Positive(t, compressedSize)

defer func() {
_ = store.DeleteChunk(ctx, "test-hash-s3-compress")
}()
})

t.Run("compressed chunk round-trips correctly via S3", func(t *testing.T) {
t.Parallel()

data := []byte("hello from S3 compressed chunk! hello from S3 compressed chunk!")
_, _, err := store.PutChunk(ctx, "test-hash-s3-roundtrip", data)
require.NoError(t, err)

defer func() {
_ = store.DeleteChunk(ctx, "test-hash-s3-roundtrip")
}()

rc, err := store.GetChunk(ctx, "test-hash-s3-roundtrip")
require.NoError(t, err)

defer rc.Close()

got, err := io.ReadAll(rc)
require.NoError(t, err)
assert.Equal(t, data, got)
})
}

func TestS3Store_PutChunk_RaceCondition(t *testing.T) {
Expand All @@ -129,7 +167,7 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) {
require.NoError(t, err)

hash := "test-hash-race"
content := []byte("race condition content")
content := []byte(strings.Repeat("race condition content", 1024))

defer func() {
_ = store.DeleteChunk(ctx, hash)
Expand All @@ -140,20 +178,20 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) {
results := make(chan bool, numGoRoutines)
errors := make(chan error, numGoRoutines)

for i := 0; i < numGoRoutines; i++ {
for range numGoRoutines {
go func() {
created, size, err := store.PutChunk(ctx, hash, content)
results <- created

assert.Equal(t, int64(len(content)), size)
assert.Greater(t, int64(len(content)), size)

errors <- err
}()
}

createdCount := 0

for i := 0; i < numGoRoutines; i++ {
for range numGoRoutines {
err := <-errors
require.NoError(t, err)

Expand Down Expand Up @@ -234,12 +272,12 @@ func TestS3Store_ChunkPath(t *testing.T) {

t.Run("short hash", func(t *testing.T) {
hash := "a"
content := "short hash content"
content := strings.Repeat("short hash content", 1024)

created, size, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created)
assert.Equal(t, int64(len(content)), size)
assert.Greater(t, int64(len(content)), size)

defer func() {
_ = store.DeleteChunk(ctx, hash)
Expand Down