diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 999424cd..c6da7209 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -1346,7 +1346,7 @@ func (c *Cache) storeNarWithCDC(ctx context.Context, tempPath string, narURL *na } // Store in chunkStore if new - _, err = chunkStore.PutChunk(ctx, chunkMetadata.Hash, chunkMetadata.Data) + _, _, err = chunkStore.PutChunk(ctx, chunkMetadata.Hash, chunkMetadata.Data) if err != nil { chunkMetadata.Free() diff --git a/pkg/storage/chunk/local.go b/pkg/storage/chunk/local.go index 4a073f2f..bd11f225 100644 --- a/pkg/storage/chunk/local.go +++ b/pkg/storage/chunk/local.go @@ -62,19 +62,19 @@ func (s *localStore) GetChunk(_ context.Context, hash string) (io.ReadCloser, er return f, nil } -func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool, error) { +func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool, int64, error) { path := s.chunkPath(hash) // Create parent directory dir := filepath.Dir(path) if err := os.MkdirAll(dir, 0o755); err != nil { - return false, err + return false, 0, err } // Write to temporary file first to ensure atomicity tmpFile, err := os.CreateTemp(dir, "chunk-*") if err != nil { - return false, err + return false, 0, err } defer os.Remove(tmpFile.Name()) // Ensure temp file is cleaned up @@ -87,19 +87,19 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool } if err != nil { - return false, err + return false, 0, err } if err := os.Link(tmpFile.Name(), path); err != nil { if os.IsExist(err) { // Chunk already exists, which is fine. We didn't create it. - return false, nil + return false, int64(len(data)), nil } - return false, err // Some other error + return false, 0, err // Some other error } - return true, nil + return true, int64(len(data)), nil } func (s *localStore) DeleteChunk(_ context.Context, hash string) error { diff --git a/pkg/storage/chunk/local_test.go b/pkg/storage/chunk/local_test.go index 9d71169d..65f664cb 100644 --- a/pkg/storage/chunk/local_test.go +++ b/pkg/storage/chunk/local_test.go @@ -31,9 +31,10 @@ func TestLocalStore(t *testing.T) { hash := "test-hash-1" content := "chunk content" - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) + assert.Equal(t, int64(len(content)), size) has, err := store.HasChunk(ctx, hash) require.NoError(t, err) @@ -55,13 +56,15 @@ func TestLocalStore(t *testing.T) { hash := "test-hash-2" content := "chunk content" - created1, err := store.PutChunk(ctx, hash, []byte(content)) + created1, size1, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created1) + assert.Equal(t, int64(len(content)), size1) - created2, err := store.PutChunk(ctx, hash, []byte(content)) + created2, size2, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.False(t, created2) + assert.Equal(t, int64(len(content)), size2) }) t.Run("get non-existent chunk", func(t *testing.T) { @@ -78,7 +81,7 @@ func TestLocalStore(t *testing.T) { hash := "abcdef123456" content := "cleanup test" - _, err := store.PutChunk(ctx, hash, []byte(content)) + _, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) path := filepath.Join(dir, "chunks", hash[:2], hash) @@ -122,7 +125,7 @@ func TestLocalStore(t *testing.T) { go func() { defer wg.Done() - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, _, err := store.PutChunk(ctx, hash, []byte(content)) createds <- created diff --git a/pkg/storage/chunk/s3.go b/pkg/storage/chunk/s3.go index a893c2ac..6515087a 100644 --- a/pkg/storage/chunk/s3.go +++ b/pkg/storage/chunk/s3.go @@ -121,14 +121,14 @@ func (s *s3Store) GetChunk(ctx context.Context, hash string) (io.ReadCloser, err return obj, nil } -func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, error) { +func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, int64, error) { key := s.chunkPath(hash) // Acquire a lock to prevent race conditions during check-then-act. // We use a prefix to avoid collisions with other locks. lockKey := fmt.Sprintf("chunk-put:%s", hash) if err := s.locker.Lock(ctx, lockKey, chunkPutLockTTL); err != nil { - return false, fmt.Errorf("error acquiring lock for chunk put: %w", err) + return false, 0, fmt.Errorf("error acquiring lock for chunk put: %w", err) } defer func() { @@ -138,21 +138,21 @@ func (s *s3Store) PutChunk(ctx context.Context, hash string, data []byte) (bool, // Check if exists. exists, err := s.HasChunk(ctx, hash) if err != nil { - return false, err + return false, 0, err } if exists { - return false, nil + return false, int64(len(data)), nil } _, err = s.client.PutObject(ctx, s.bucket, key, bytes.NewReader(data), int64(len(data)), minio.PutObjectOptions{ ContentType: "application/octet-stream", }) if err != nil { - return false, fmt.Errorf("error putting chunk to S3: %w", err) + return false, 0, fmt.Errorf("error putting chunk to S3: %w", err) } - return true, nil + return true, int64(len(data)), nil } func (s *s3Store) DeleteChunk(ctx context.Context, hash string) error { diff --git a/pkg/storage/chunk/s3_test.go b/pkg/storage/chunk/s3_test.go index 8872c435..a715fb9f 100644 --- a/pkg/storage/chunk/s3_test.go +++ b/pkg/storage/chunk/s3_test.go @@ -36,9 +36,10 @@ func TestS3Store_Integration(t *testing.T) { hash := "test-hash-s3-1" content := "s3 chunk content" - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) + assert.Equal(t, int64(len(content)), size) defer func() { err := store.DeleteChunk(ctx, hash) @@ -65,18 +66,20 @@ func TestS3Store_Integration(t *testing.T) { hash := "test-hash-s3-2" content := "s3 chunk content" - created1, err := store.PutChunk(ctx, hash, []byte(content)) + created1, size1, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created1) + assert.Equal(t, int64(len(content)), size1) defer func() { err := store.DeleteChunk(ctx, hash) assert.NoError(t, err) }() - created2, err := store.PutChunk(ctx, hash, []byte(content)) + created2, size2, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.False(t, created2) + assert.Equal(t, int64(len(content)), size2) }) t.Run("get non-existent chunk", func(t *testing.T) { @@ -97,10 +100,10 @@ func TestS3Store_Integration(t *testing.T) { err := store.DeleteChunk(ctx, hash) require.NoError(t, err) - // Put and then delete twice - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) + assert.Equal(t, int64(len(content)), size) err = store.DeleteChunk(ctx, hash) require.NoError(t, err) @@ -139,9 +142,11 @@ func TestS3Store_PutChunk_RaceCondition(t *testing.T) { for i := 0; i < numGoRoutines; i++ { go func() { - created, err := store.PutChunk(ctx, hash, content) + created, size, err := store.PutChunk(ctx, hash, content) results <- created + assert.Equal(t, int64(len(content)), size) + errors <- err }() } @@ -231,9 +236,10 @@ func TestS3Store_ChunkPath(t *testing.T) { hash := "a" content := "short hash content" - created, err := store.PutChunk(ctx, hash, []byte(content)) + created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) + assert.Equal(t, int64(len(content)), size) defer func() { _ = store.DeleteChunk(ctx, hash) @@ -283,7 +289,8 @@ func TestS3Store_PutChunk_LockFailure(t *testing.T) { expectedErr := errors.New("lock failure") s.SetLocker(&mockLocker{lockErr: expectedErr}) - _, err = store.PutChunk(ctx, "test-hash", []byte("content")) + _, size, err := store.PutChunk(ctx, "test-hash", []byte("content")) + assert.Equal(t, int64(0), size) require.ErrorIs(t, err, expectedErr) assert.Contains(t, err.Error(), "error acquiring lock for chunk put") } diff --git a/pkg/storage/chunk/store.go b/pkg/storage/chunk/store.go index a0a62107..51ef511d 100644 --- a/pkg/storage/chunk/store.go +++ b/pkg/storage/chunk/store.go @@ -19,8 +19,8 @@ type Store interface { // NOTE: The caller must close the returned io.ReadCloser! GetChunk(ctx context.Context, hash string) (io.ReadCloser, error) - // PutChunk stores a chunk. Returns true if chunk was new. - PutChunk(ctx context.Context, hash string, data []byte) (bool, error) + // PutChunk stores a chunk. Returns true if chunk was new, and the compressed size. + PutChunk(ctx context.Context, hash string, data []byte) (bool, int64, error) // DeleteChunk removes a chunk. DeleteChunk(ctx context.Context, hash string) error