From c933ddfecd9bbd478e9c630fffcd5268f3e53143 Mon Sep 17 00:00:00 2001 From: Wael Nasreddine Date: Wed, 11 Feb 2026 20:58:35 -0800 Subject: [PATCH] feat: add zstd compression to local chunk store Implement Zstandard (zstd) compression for the local chunk store to reduce disk usage for cached chunks. Changes: - Added zstd encoder to the localStore struct for efficient compression. - Modified PutChunk to compress data using zstd before writing to disk and return the compressed size. - Updated GetChunk to use zstd.NewReader for transparent decompression, wrapped in a localReadCloser to ensure proper resource cleanup. - Added comprehensive tests to verify compression efficiency and data integrity through round-trip checks. This change reduces the storage footprint of the local chunk cache, which is especially beneficial for large Nix store paths. --- pkg/storage/chunk/local.go | 56 ++++++++++++++++++++++++++++++--- pkg/storage/chunk/local_test.go | 56 ++++++++++++++++++++++++++------- 2 files changed, 95 insertions(+), 17 deletions(-) diff --git a/pkg/storage/chunk/local.go b/pkg/storage/chunk/local.go index bd11f225..dbbd1681 100644 --- a/pkg/storage/chunk/local.go +++ b/pkg/storage/chunk/local.go @@ -6,18 +6,53 @@ import ( "io" "os" "path/filepath" + + "github.com/klauspost/compress/zstd" ) +// localReadCloser wraps a zstd decoder and file to properly close both on Close(). +type localReadCloser struct { + *zstd.Decoder + file *os.File +} + +func (r *localReadCloser) Close() error { + r.Decoder.Close() + + return r.file.Close() +} + // localStore implements Store for local filesystem. type localStore struct { baseDir string + encoder *zstd.Encoder + decoder *zstd.Decoder } // NewLocalStore returns a new local chunk store. func NewLocalStore(baseDir string) (Store, error) { - s := &localStore{baseDir: baseDir} + encoder, err := zstd.NewWriter(nil) + if err != nil { + return nil, fmt.Errorf("failed to create zstd encoder: %w", err) + } + + decoder, err := zstd.NewReader(nil) + if err != nil { + encoder.Close() + + return nil, fmt.Errorf("failed to create zstd decoder: %w", err) + } + + s := &localStore{ + baseDir: baseDir, + encoder: encoder, + decoder: decoder, + } // Ensure base directory exists if err := os.MkdirAll(s.storeDir(), 0o755); err != nil { + encoder.Close() + decoder.Close() + return nil, fmt.Errorf("failed to create chunk store directory: %w", err) } @@ -59,7 +94,15 @@ func (s *localStore) GetChunk(_ context.Context, hash string) (io.ReadCloser, er return nil, err } - return f, nil + // Create a new decoder for this specific file + decoder, err := zstd.NewReader(f) + if err != nil { + f.Close() + + return nil, fmt.Errorf("failed to create zstd decoder: %w", err) + } + + return &localReadCloser{decoder, f}, nil } func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool, int64, error) { @@ -71,6 +114,9 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool return false, 0, err } + // Compress data with zstd + compressed := s.encoder.EncodeAll(data, nil) + // Write to temporary file first to ensure atomicity tmpFile, err := os.CreateTemp(dir, "chunk-*") if err != nil { @@ -78,7 +124,7 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool } defer os.Remove(tmpFile.Name()) // Ensure temp file is cleaned up - if _, err = tmpFile.Write(data); err == nil { + if _, err = tmpFile.Write(compressed); err == nil { err = tmpFile.Sync() } @@ -93,13 +139,13 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool if err := os.Link(tmpFile.Name(), path); err != nil { if os.IsExist(err) { // Chunk already exists, which is fine. We didn't create it. - return false, int64(len(data)), nil + return false, int64(len(compressed)), nil } return false, 0, err // Some other error } - return true, int64(len(data)), nil + return true, int64(len(compressed)), nil } func (s *localStore) DeleteChunk(_ context.Context, hash string) error { diff --git a/pkg/storage/chunk/local_test.go b/pkg/storage/chunk/local_test.go index 65f664cb..183fd5d4 100644 --- a/pkg/storage/chunk/local_test.go +++ b/pkg/storage/chunk/local_test.go @@ -1,10 +1,12 @@ package chunk_test import ( + "bytes" "context" "io" "os" "path/filepath" + "strings" "sync" "testing" @@ -12,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/kalbasit/ncps/pkg/storage/chunk" + "github.com/kalbasit/ncps/testhelper" ) func TestLocalStore(t *testing.T) { @@ -28,13 +31,13 @@ func TestLocalStore(t *testing.T) { t.Run("put and get chunk", func(t *testing.T) { t.Parallel() - hash := "test-hash-1" - content := "chunk content" + hash := testhelper.MustRandNarHash() + content := strings.Repeat("chunk content", 1024) created, size, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created) - assert.Equal(t, int64(len(content)), size) + assert.Greater(t, int64(len(content)), size) has, err := store.HasChunk(ctx, hash) require.NoError(t, err) @@ -53,24 +56,24 @@ func TestLocalStore(t *testing.T) { t.Run("duplicate put", func(t *testing.T) { t.Parallel() - hash := "test-hash-2" - content := "chunk content" + hash := testhelper.MustRandNarHash() + content := strings.Repeat("chunk content", 1024) created1, size1, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.True(t, created1) - assert.Equal(t, int64(len(content)), size1) + assert.Greater(t, int64(len(content)), size1) created2, size2, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) assert.False(t, created2) - assert.Equal(t, int64(len(content)), size2) + assert.Greater(t, int64(len(content)), size2) }) t.Run("get non-existent chunk", func(t *testing.T) { t.Parallel() - hash := "non-existent" + hash := testhelper.MustRandNarHash() _, err := store.GetChunk(ctx, hash) require.ErrorIs(t, err, chunk.ErrNotFound) }) @@ -78,8 +81,8 @@ func TestLocalStore(t *testing.T) { t.Run("delete chunk cleans up directory", func(t *testing.T) { t.Parallel() - hash := "abcdef123456" - content := "cleanup test" + hash := testhelper.MustRandNarHash() + content := strings.Repeat("cleanup test", 1024) _, _, err := store.PutChunk(ctx, hash, []byte(content)) require.NoError(t, err) @@ -109,8 +112,8 @@ func TestLocalStore(t *testing.T) { t.Run("PutChunk concurrent", func(t *testing.T) { t.Parallel() - hash := "concurrent-hash" - content := "concurrent content" + hash := testhelper.MustRandNarHash() + content := strings.Repeat("concurrent content", 1024) numGoroutines := 10 var ( @@ -155,4 +158,33 @@ func TestLocalStore(t *testing.T) { require.NoError(t, err) assert.True(t, has) }) + + t.Run("stored chunk is zstd-compressed on disk", func(t *testing.T) { + t.Parallel() + + // Use highly compressible data (repeated bytes) + data := bytes.Repeat([]byte("compressible"), 1024) + isNew, compressedSize, err := store.PutChunk(ctx, "test-hash-compress-1", data) + require.NoError(t, err) + assert.True(t, isNew) + assert.Greater(t, int64(len(data)), compressedSize, "compressed size should be less than original") + assert.Positive(t, compressedSize, "compressed size should be greater than 0") + }) + + t.Run("compressed chunk round-trips correctly", func(t *testing.T) { + t.Parallel() + + data := []byte("hello, compressed world! hello, compressed world! hello, compressed world!") + _, _, err := store.PutChunk(ctx, "test-hash-roundtrip", data) + require.NoError(t, err) + + rc, err := store.GetChunk(ctx, "test-hash-roundtrip") + require.NoError(t, err) + + defer rc.Close() + + got, err := io.ReadAll(rc) + require.NoError(t, err) + assert.Equal(t, data, got) + }) }