Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,7 +1346,7 @@ func (c *Cache) storeNarWithCDC(ctx context.Context, tempPath string, narURL *na
}

// Store in chunkStore if new
_, err = chunkStore.PutChunk(ctx, chunkMetadata.Hash, chunkMetadata.Data)
_, _, err = chunkStore.PutChunk(ctx, chunkMetadata.Hash, chunkMetadata.Data)
if err != nil {
chunkMetadata.Free()

Expand Down
14 changes: 7 additions & 7 deletions pkg/storage/chunk/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ func (s *localStore) GetChunk(_ context.Context, hash string) (io.ReadCloser, er
return f, nil
}

func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool, error) {
func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool, int64, error) {
path := s.chunkPath(hash)

// Create parent directory
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0o755); err != nil {
return false, err
return false, 0, err
}

// Write to temporary file first to ensure atomicity
tmpFile, err := os.CreateTemp(dir, "chunk-*")
if err != nil {
return false, err
return false, 0, err
}
defer os.Remove(tmpFile.Name()) // Ensure temp file is cleaned up

Expand All @@ -87,19 +87,19 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool
}

if err != nil {
return false, err
return false, 0, err
}

if err := os.Link(tmpFile.Name(), path); err != nil {
if os.IsExist(err) {
// Chunk already exists, which is fine. We didn't create it.
return false, nil
return false, int64(len(data)), nil
}

return false, err // Some other error
return false, 0, err // Some other error
}

return true, nil
return true, int64(len(data)), nil
}

func (s *localStore) DeleteChunk(_ context.Context, hash string) error {
Expand Down
13 changes: 8 additions & 5 deletions pkg/storage/chunk/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ func TestLocalStore(t *testing.T) {
hash := "test-hash-1"
content := "chunk content"

created, err := store.PutChunk(ctx, hash, []byte(content))
created, size, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created)
assert.Equal(t, int64(len(content)), size)

has, err := store.HasChunk(ctx, hash)
require.NoError(t, err)
Expand All @@ -55,13 +56,15 @@ func TestLocalStore(t *testing.T) {
hash := "test-hash-2"
content := "chunk content"

created1, err := store.PutChunk(ctx, hash, []byte(content))
created1, size1, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created1)
assert.Equal(t, int64(len(content)), size1)

created2, err := store.PutChunk(ctx, hash, []byte(content))
created2, size2, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.False(t, created2)
assert.Equal(t, int64(len(content)), size2)
})

t.Run("get non-existent chunk", func(t *testing.T) {
Expand All @@ -78,7 +81,7 @@ func TestLocalStore(t *testing.T) {
hash := "abcdef123456"
content := "cleanup test"

_, err := store.PutChunk(ctx, hash, []byte(content))
_, _, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)

path := filepath.Join(dir, "chunks", hash[:2], hash)
Expand Down Expand Up @@ -122,7 +125,7 @@ func TestLocalStore(t *testing.T) {
go func() {
defer wg.Done()

created, err := store.PutChunk(ctx, hash, []byte(content))
created, _, err := store.PutChunk(ctx, hash, []byte(content))

createds <- created

Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/chunk/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ func (s *s3Store) GetChunk(ctx context.Context, hash string) (io.ReadCloser, err
return obj, nil
}

func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, error) {
func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, int64, error) {
key := s.chunkPath(hash)

// Acquire a lock to prevent race conditions during check-then-act.
// We use a prefix to avoid collisions with other locks.
lockKey := fmt.Sprintf("chunk-put:%s", hash)
if err := s.locker.Lock(ctx, lockKey, chunkPutLockTTL); err != nil {
return false, fmt.Errorf("error acquiring lock for chunk put: %w", err)
return false, 0, fmt.Errorf("error acquiring lock for chunk put: %w", err)
}

defer func() {
Expand All @@ -138,21 +138,21 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool,
// Check if exists.
exists, err := s.HasChunk(ctx, hash)
if err != nil {
return false, err
return false, 0, err
}

if exists {
return false, nil
return false, int64(len(data)), nil
}

_, err = s.client.PutObject(ctx, s.bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{
ContentType: "application/octet-stream",
})
if err != nil {
return false, fmt.Errorf("error putting chunk to S3: %w", err)
return false, 0, fmt.Errorf("error putting chunk to S3: %w", err)
}

return true, nil
return true, int64(len(data)), nil
}

func (s *s3Store) DeleteChunk(ctx context.Context, hash string) error {
Expand Down
23 changes: 15 additions & 8 deletions pkg/storage/chunk/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ func TestS3Store_Integration(t *testing.T) {
hash := "test-hash-s3-1"
content := "s3 chunk content"

created, err := store.PutChunk(ctx, hash, []byte(content))
created, size, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created)
assert.Equal(t, int64(len(content)), size)

defer func() {
err := store.DeleteChunk(ctx, hash)
Expand All @@ -65,18 +66,20 @@ func TestS3Store_Integration(t *testing.T) {
hash := "test-hash-s3-2"
content := "s3 chunk content"

created1, err := store.PutChunk(ctx, hash, []byte(content))
created1, size1, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created1)
assert.Equal(t, int64(len(content)), size1)

defer func() {
err := store.DeleteChunk(ctx, hash)
assert.NoError(t, err)
}()

created2, err := store.PutChunk(ctx, hash, []byte(content))
created2, size2, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.False(t, created2)
assert.Equal(t, int64(len(content)), size2)
})

t.Run("get non-existent chunk", func(t *testing.T) {
Expand All @@ -97,10 +100,10 @@ func TestS3Store_Integration(t *testing.T) {
err := store.DeleteChunk(ctx, hash)
require.NoError(t, err)

// Put and then delete twice
created, err := store.PutChunk(ctx, hash, []byte(content))
created, size, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created)
assert.Equal(t, int64(len(content)), size)

err = store.DeleteChunk(ctx, hash)
require.NoError(t, err)
Expand Down Expand Up @@ -139,9 +142,11 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) {

for i := 0; i < numGoRoutines; i++ {
go func() {
created, err := store.PutChunk(ctx, hash, content)
created, size, err := store.PutChunk(ctx, hash, content)
results <- created

assert.Equal(t, int64(len(content)), size)

errors <- err
}()
}
Expand Down Expand Up @@ -231,9 +236,10 @@ func TestS3Store_ChunkPath(t *testing.T) {
hash := "a"
content := "short hash content"

created, err := store.PutChunk(ctx, hash, []byte(content))
created, size, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created)
assert.Equal(t, int64(len(content)), size)

defer func() {
_ = store.DeleteChunk(ctx, hash)
Expand Down Expand Up @@ -283,7 +289,8 @@ func TestS3Store_PutChunk_LockFailure(t *testing.T) {
expectedErr := errors.New("lock failure")
s.SetLocker(&mockLocker{lockErr: expectedErr})

_, err = store.PutChunk(ctx, "test-hash", []byte("content"))
_, size, err := store.PutChunk(ctx, "test-hash", []byte("content"))
assert.Equal(t, int64(0), size)
require.ErrorIs(t, err, expectedErr)
assert.Contains(t, err.Error(), "error acquiring lock for chunk put")
}
4 changes: 2 additions & 2 deletions pkg/storage/chunk/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type Store interface {
// NOTE: The caller must close the returned io.ReadCloser!
GetChunk(ctx context.Context, hash string) (io.ReadCloser, error)

// PutChunk stores a chunk. Returns true if chunk was new.
PutChunk(ctx context.Context, hash string, data []byte) (bool, error)
// PutChunk stores a chunk. Returns true if chunk was new, and the compressed size.
PutChunk(ctx context.Context, hash string, data []byte) (bool, int64, error)

// DeleteChunk removes a chunk.
DeleteChunk(ctx context.Context, hash string) error
Expand Down