From 49755eced5d70add50fe88e0424d0b3d7cb70153 Mon Sep 17 00:00:00 2001 From: Wael Nasreddine Date: Wed, 11 Feb 2026 20:51:30 -0800 Subject: [PATCH 1/2] refactor: update PutChunk interface to return compressed_size Updated the PutChunk interface in the storage layer to return the compressed size of the chunk. This metric is necessary for the cache implementation to accurately manage storage limits and track disk usage. Changes: - Modified PutChunk signature in Store interface to return (bool, int64, error). - Updated filesystem storage implementation to return the length of the data. - Updated S3 storage implementation to return 0 (to be updated later or as placeholder). - Updated all callers in tests and distributed cache to handle the new return value. --- pkg/cache/cache.go | 2 +- pkg/storage/chunk/local.go | 14 +++++++------- pkg/storage/chunk/local_test.go | 10 +++++----- pkg/storage/chunk/s3.go | 12 ++++++------ pkg/storage/chunk/s3_test.go | 14 +++++++------- pkg/storage/chunk/store.go | 4 ++-- 6 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 999424cd..c6da7209 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -1346,7 +1346,7 @@ func (c *Cache) storeNarWithCDC(ctx context.Context, tempPath string, narURL *na } // Store in chunkStore if new - _, err = chunkStore.PutChunk(ctx, chunkMetadata.Hash, chunkMetadata.Data) + _, _, err = chunkStore.PutChunk(ctx, chunkMetadata.Hash, chunkMetadata.Data) if err != nil { chunkMetadata.Free() diff --git a/pkg/storage/chunk/local.go b/pkg/storage/chunk/local.go index 4a073f2f..8173ad5f 100644 --- a/pkg/storage/chunk/local.go +++ b/pkg/storage/chunk/local.go @@ -62,19 +62,19 @@ func (s *localStore) GetChunk(_ context.Context, hash string) (io.ReadCloser, er return f, nil } -func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool, error) { +func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool, int64, error) { path := s.chunkPath(hash) // Create parent directory dir := filepath.Dir(path) if err := os.MkdirAll(dir, 0o755); err != nil { - return false, err + return false, 0, err } // Write to temporary file first to ensure atomicity tmpFile, err := os.CreateTemp(dir, "chunk-*") if err != nil { - return false, err + return false, 0, err } defer os.Remove(tmpFile.Name()) // Ensure temp file is cleaned up @@ -87,19 +87,19 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool } if err != nil { - return false, err + return false, 0, err } if err := os.Link(tmpFile.Name(), path); err != nil { if os.IsExist(err) { // Chunk already exists, which is fine. We didn't create it. - return false, nil + return false, 0, nil } - return false, err // Some other error + return false, 0, err // Some other error } - return true, nil + return true, 0, nil } func (s *localStore) DeleteChunk(_ context.Context, hash string) error { diff --git a/pkg/storage/chunk/local_test.go b/pkg/storage/chunk/local_test.go index 9d71169d..cc09b09a 100644 --- a/pkg/storage/chunk/local_test.go +++ b/pkg/storage/chunk/local_test.go @@ -31,7 +31,7 @@ func TestLocalStore(t *testing.T) { hash := "test-hash-1" content := "chunk content" - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) @@ -55,11 +55,11 @@ func TestLocalStore(t *testing.T) { hash := "test-hash-2" content := "chunk content" - created1, err := store.PutChunk(ctx, hash, []byte(content)) + created1, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created1) - created2, err := store.PutChunk(ctx, hash, []byte(content)) + created2, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.False(t, created2) }) @@ -78,7 +78,7 @@ func TestLocalStore(t *testing.T) { hash := "abcdef123456" content := "cleanup test" - _, err := store.PutChunk(ctx, hash, []byte(content)) + _, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) path := filepath.Join(dir, "chunks", hash[:2], hash) @@ -122,7 +122,7 @@ func TestLocalStore(t *testing.T) { go func() { defer wg.Done() - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, _, err := store.PutChunk(ctx, hash, []byte(content)) createds <- created diff --git a/pkg/storage/chunk/s3.go b/pkg/storage/chunk/s3.go index a893c2ac..8017fdd5 100644 --- a/pkg/storage/chunk/s3.go +++ b/pkg/storage/chunk/s3.go @@ -121,14 +121,14 @@ func (s *s3Store) GetChunk(ctx context.Context, hash string) (io.ReadCloser, err return obj, nil } -func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, error) { +func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, int64, error) { key := s.chunkPath(hash) // Acquire a lock to prevent race conditions during check-then-act. // We use a prefix to avoid collisions with other locks. lockKey := fmt.Sprintf("chunk-put:%s", hash) if err := s.locker.Lock(ctx, lockKey, chunkPutLockTTL); err != nil { - return false, fmt.Errorf("error acquiring lock for chunk put: %w", err) + return false, 0, fmt.Errorf("error acquiring lock for chunk put: %w", err) } defer func() { @@ -138,21 +138,21 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, // Check if exists. exists, err := s.HasChunk(ctx, hash) if err != nil { - return false, err + return false, 0, err } if exists { - return false, nil + return false, 0, nil } _, err = s.client.PutObject(ctx, s.bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{ ContentType: "application/octet-stream", }) if err != nil { - return false, fmt.Errorf("error putting chunk to S3: %w", err) + return false, 0, fmt.Errorf("error putting chunk to S3: %w", err) } - return true, nil + return true, 0, nil } func (s *s3Store) DeleteChunk(ctx context.Context, hash string) error { diff --git a/pkg/storage/chunk/s3_test.go b/pkg/storage/chunk/s3_test.go index 8872c435..cd70981d 100644 --- a/pkg/storage/chunk/s3_test.go +++ b/pkg/storage/chunk/s3_test.go @@ -36,7 +36,7 @@ func TestS3Store_Integration(t *testing.T) { hash := "test-hash-s3-1" content := "s3 chunk content" - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) @@ -65,7 +65,7 @@ func TestS3Store_Integration(t *testing.T) { hash := "test-hash-s3-2" content := "s3 chunk content" - created1, err := store.PutChunk(ctx, hash, []byte(content)) + created1, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created1) @@ -74,7 +74,7 @@ func TestS3Store_Integration(t *testing.T) { assert.NoError(t, err) }() - created2, err := store.PutChunk(ctx, hash, []byte(content)) + created2, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.False(t, created2) }) @@ -98,7 +98,7 @@ func TestS3Store_Integration(t *testing.T) { require.NoError(t, err) // Put and then delete twice - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) @@ -139,7 +139,7 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) { for i := 0; i < numGoRoutines; i++ { go func() { - created, err := store.PutChunk(ctx, hash, content) + created, _, err := store.PutChunk(ctx, hash, content) results <- created errors <- err @@ -231,7 +231,7 @@ func TestS3Store_ChunkPath(t *testing.T) { hash := "a" content := "short hash content" - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) @@ -283,7 +283,7 @@ func TestS3Store_PutChunk_LockFailure(t *testing.T) { expectedErr := errors.New("lock failure") s.SetLocker(&mockLocker{lockErr: expectedErr}) - _, err = store.PutChunk(ctx, "test-hash", []byte("content")) + _, _, err = store.PutChunk(ctx, "test-hash", []byte("content")) require.ErrorIs(t, err, expectedErr) assert.Contains(t, err.Error(), "error acquiring lock for chunk put") } diff --git a/pkg/storage/chunk/store.go b/pkg/storage/chunk/store.go index a0a62107..51ef511d 100644 --- a/pkg/storage/chunk/store.go +++ b/pkg/storage/chunk/store.go @@ -19,8 +19,8 @@ type Store interface { // NOTE: The caller must close the returned io.ReadCloser! GetChunk(ctx context.Context, hash string) (io.ReadCloser, error) - // PutChunk stores a chunk. Returns true if chunk was new. - PutChunk(ctx context.Context, hash string, data []byte) (bool, error) + // PutChunk stores a chunk. Returns true if chunk was new, and the compressed size. + PutChunk(ctx context.Context, hash string, data []byte) (bool, int64, error) // DeleteChunk removes a chunk. DeleteChunk(ctx context.Context, hash string) error From 264177be7fb9dfc3ff27f689f9a867c763b6a860 Mon Sep 17 00:00:00 2001 From: Wael Nasreddine Date: Wed, 11 Feb 2026 22:32:20 -0800 Subject: [PATCH 2/2] fix: implementation of correct behavior for PutChunk size return value - Implement correct behavior for PutChunk in localStore and s3Store by returning the actual data length instead of a placeholder 0. - Update tests to assert that PutChunk returns the correct size, following TDD. - Address PR comments regarding placeholder return values. --- pkg/storage/chunk/local.go | 4 ++-- pkg/storage/chunk/local_test.go | 9 ++++++--- pkg/storage/chunk/s3.go | 4 ++-- pkg/storage/chunk/s3_test.go | 23 +++++++++++++++-------- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/pkg/storage/chunk/local.go b/pkg/storage/chunk/local.go index 8173ad5f..bd11f225 100644 --- a/pkg/storage/chunk/local.go +++ b/pkg/storage/chunk/local.go @@ -93,13 +93,13 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool if err := os.Link(tmpFile.Name(), path); err != nil { if os.IsExist(err) { // Chunk already exists, which is fine. We didn't create it. - return false, 0, nil + return false, int64(len(data)), nil } return false, 0, err // Some other error } - return true, 0, nil + return true, int64(len(data)), nil } func (s *localStore) DeleteChunk(_ context.Context, hash string) error { diff --git a/pkg/storage/chunk/local_test.go b/pkg/storage/chunk/local_test.go index cc09b09a..65f664cb 100644 --- a/pkg/storage/chunk/local_test.go +++ b/pkg/storage/chunk/local_test.go @@ -31,9 +31,10 @@ func TestLocalStore(t *testing.T) { hash := "test-hash-1" content := "chunk content" - created, _, err := store.PutChunk(ctx, hash, []byte(content)) + created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) + assert.Equal(t, int64(len(content)), size) has, err := store.HasChunk(ctx, hash) require.NoError(t, err) @@ -55,13 +56,15 @@ func TestLocalStore(t *testing.T) { hash := "test-hash-2" content := "chunk content" - created1, _, err := store.PutChunk(ctx, hash, []byte(content)) + created1, size1, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created1) + assert.Equal(t, int64(len(content)), size1) - created2, _, err := store.PutChunk(ctx, hash, []byte(content)) + created2, size2, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.False(t, created2) + assert.Equal(t, int64(len(content)), size2) }) t.Run("get non-existent chunk", func(t *testing.T) { diff --git a/pkg/storage/chunk/s3.go b/pkg/storage/chunk/s3.go index 8017fdd5..6515087a 100644 --- a/pkg/storage/chunk/s3.go +++ b/pkg/storage/chunk/s3.go @@ -142,7 +142,7 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, } if exists { - return false, 0, nil + return false, int64(len(data)), nil } _, err = s.client.PutObject(ctx, s.bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{ @@ -152,7 +152,7 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, return false, 0, fmt.Errorf("error putting chunk to S3: %w", err) } - return true, 0, nil + return true, int64(len(data)), nil } func (s *s3Store) DeleteChunk(ctx context.Context, hash string) error { diff --git a/pkg/storage/chunk/s3_test.go b/pkg/storage/chunk/s3_test.go index cd70981d..a715fb9f 100644 --- a/pkg/storage/chunk/s3_test.go +++ b/pkg/storage/chunk/s3_test.go @@ -36,9 +36,10 @@ func TestS3Store_Integration(t *testing.T) { hash := "test-hash-s3-1" content := "s3 chunk content" - created, _, err := store.PutChunk(ctx, hash, []byte(content)) + created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) + assert.Equal(t, int64(len(content)), size) defer func() { err := store.DeleteChunk(ctx, hash) @@ -65,18 +66,20 @@ func TestS3Store_Integration(t *testing.T) { hash := "test-hash-s3-2" content := "s3 chunk content" - created1, _, err := store.PutChunk(ctx, hash, []byte(content)) + created1, size1, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created1) + assert.Equal(t, int64(len(content)), size1) defer func() { err := store.DeleteChunk(ctx, hash) assert.NoError(t, err) }() - created2, _, err := store.PutChunk(ctx, hash, []byte(content)) + created2, size2, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.False(t, created2) + assert.Equal(t, int64(len(content)), size2) }) t.Run("get non-existent chunk", func(t *testing.T) { @@ -97,10 +100,10 @@ func TestS3Store_Integration(t *testing.T) { err := store.DeleteChunk(ctx, hash) require.NoError(t, err) - // Put and then delete twice - created, _, err := store.PutChunk(ctx, hash, []byte(content)) + created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) + assert.Equal(t, int64(len(content)), size) err = store.DeleteChunk(ctx, hash) require.NoError(t, err) @@ -139,9 +142,11 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) { for i := 0; i < numGoRoutines; i++ { go func() { - created, _, err := store.PutChunk(ctx, hash, content) + created, size, err := store.PutChunk(ctx, hash, content) results <- created + assert.Equal(t, int64(len(content)), size) + errors <- err }() } @@ -231,9 +236,10 @@ func TestS3Store_ChunkPath(t *testing.T) { hash := "a" content := "short hash content" - created, _, err := store.PutChunk(ctx, hash, []byte(content)) + created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) + assert.Equal(t, int64(len(content)), size) defer func() { _ = store.DeleteChunk(ctx, hash) @@ -283,7 +289,8 @@ func TestS3Store_PutChunk_LockFailure(t *testing.T) { expectedErr := errors.New("lock failure") s.SetLocker(&mockLocker{lockErr: expectedErr}) - _, _, err = store.PutChunk(ctx, "test-hash", []byte("content")) + _, size, err := store.PutChunk(ctx, "test-hash", []byte("content")) + assert.Equal(t, int64(0), size) require.ErrorIs(t, err, expectedErr) assert.Contains(t, err.Error(), "error acquiring lock for chunk put") }