Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions pkg/storage/chunk/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,53 @@ import (
"io"
"os"
"path/filepath"

"github.com/klauspost/compress/zstd"
)

// localReadCloser wraps a zstd decoder and file to properly close both on Close().
type localReadCloser struct {
*zstd.Decoder
file *os.File
}

func (r *localReadCloser) Close() error {
r.Decoder.Close()

return r.file.Close()
}

// localStore implements Store for local filesystem.
type localStore struct {
baseDir string
encoder *zstd.Encoder
decoder *zstd.Decoder
}

// NewLocalStore returns a new local chunk store.
func NewLocalStore(baseDir string) (Store, error) {
s := &localStore{baseDir: baseDir}
encoder, err := zstd.NewWriter(nil)
if err != nil {
return nil, fmt.Errorf("failed to create zstd encoder: %w", err)
}

decoder, err := zstd.NewReader(nil)
if err != nil {
encoder.Close()

return nil, fmt.Errorf("failed to create zstd decoder: %w", err)
}

s := &localStore{
baseDir: baseDir,
encoder: encoder,
decoder: decoder,
}
// Ensure base directory exists
if err := os.MkdirAll(s.storeDir(), 0o755); err != nil {
encoder.Close()
decoder.Close()

return nil, fmt.Errorf("failed to create chunk store directory: %w", err)
}

Expand Down Expand Up @@ -59,7 +94,15 @@ func (s *localStore) GetChunk(_ context.Context, hash string) (io.ReadCloser, er
return nil, err
}

return f, nil
// Create a new decoder for this specific file
decoder, err := zstd.NewReader(f)
if err != nil {
f.Close()

return nil, fmt.Errorf("failed to create zstd decoder: %w", err)
}

return &localReadCloser{decoder, f}, nil
}

func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool, int64, error) {
Expand All @@ -71,14 +114,17 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool
return false, 0, err
}

// Compress data with zstd
compressed := s.encoder.EncodeAll(data, nil)

// Write to temporary file first to ensure atomicity
tmpFile, err := os.CreateTemp(dir, "chunk-*")
if err != nil {
return false, 0, err
}
defer os.Remove(tmpFile.Name()) // Ensure temp file is cleaned up

if _, err = tmpFile.Write(data); err == nil {
if _, err = tmpFile.Write(compressed); err == nil {
err = tmpFile.Sync()
}

Expand All @@ -93,13 +139,13 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool
if err := os.Link(tmpFile.Name(), path); err != nil {
if os.IsExist(err) {
// Chunk already exists, which is fine. We didn't create it.
return false, int64(len(data)), nil
return false, int64(len(compressed)), nil
}

return false, 0, err // Some other error
}

return true, int64(len(data)), nil
return true, int64(len(compressed)), nil
}

func (s *localStore) DeleteChunk(_ context.Context, hash string) error {
Expand Down
56 changes: 44 additions & 12 deletions pkg/storage/chunk/local_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package chunk_test

import (
"bytes"
"context"
"io"
"os"
"path/filepath"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kalbasit/ncps/pkg/storage/chunk"
"github.com/kalbasit/ncps/testhelper"
)

func TestLocalStore(t *testing.T) {
Expand All @@ -28,13 +31,13 @@ func TestLocalStore(t *testing.T) {
t.Run("put and get chunk", func(t *testing.T) {
t.Parallel()

hash := "test-hash-1"
content := "chunk content"
hash := testhelper.MustRandNarHash()
content := strings.Repeat("chunk content", 1024)

created, size, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created)
assert.Equal(t, int64(len(content)), size)
assert.Greater(t, int64(len(content)), size)

has, err := store.HasChunk(ctx, hash)
require.NoError(t, err)
Expand All @@ -53,33 +56,33 @@ func TestLocalStore(t *testing.T) {
t.Run("duplicate put", func(t *testing.T) {
t.Parallel()

hash := "test-hash-2"
content := "chunk content"
hash := testhelper.MustRandNarHash()
content := strings.Repeat("chunk content", 1024)

created1, size1, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.True(t, created1)
assert.Equal(t, int64(len(content)), size1)
assert.Greater(t, int64(len(content)), size1)

created2, size2, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
assert.False(t, created2)
assert.Equal(t, int64(len(content)), size2)
assert.Greater(t, int64(len(content)), size2)
})

t.Run("get non-existent chunk", func(t *testing.T) {
t.Parallel()

hash := "non-existent"
hash := testhelper.MustRandNarHash()
_, err := store.GetChunk(ctx, hash)
require.ErrorIs(t, err, chunk.ErrNotFound)
})

t.Run("delete chunk cleans up directory", func(t *testing.T) {
t.Parallel()

hash := "abcdef123456"
content := "cleanup test"
hash := testhelper.MustRandNarHash()
content := strings.Repeat("cleanup test", 1024)

_, _, err := store.PutChunk(ctx, hash, []byte(content))
require.NoError(t, err)
Expand Down Expand Up @@ -109,8 +112,8 @@ func TestLocalStore(t *testing.T) {
t.Run("PutChunk concurrent", func(t *testing.T) {
t.Parallel()

hash := "concurrent-hash"
content := "concurrent content"
hash := testhelper.MustRandNarHash()
content := strings.Repeat("concurrent content", 1024)
numGoroutines := 10

var (
Expand Down Expand Up @@ -155,4 +158,33 @@ func TestLocalStore(t *testing.T) {
require.NoError(t, err)
assert.True(t, has)
})

t.Run("stored chunk is zstd-compressed on disk", func(t *testing.T) {
t.Parallel()

// Use highly compressible data (repeated bytes)
data := bytes.Repeat([]byte("compressible"), 1024)
isNew, compressedSize, err := store.PutChunk(ctx, "test-hash-compress-1", data)
require.NoError(t, err)
assert.True(t, isNew)
assert.Greater(t, int64(len(data)), compressedSize, "compressed size should be less than original")
assert.Positive(t, compressedSize, "compressed size should be greater than 0")
})

t.Run("compressed chunk round-trips correctly", func(t *testing.T) {
t.Parallel()

data := []byte("hello, compressed world! hello, compressed world! hello, compressed world!")
_, _, err := store.PutChunk(ctx, "test-hash-roundtrip", data)
require.NoError(t, err)

rc, err := store.GetChunk(ctx, "test-hash-roundtrip")
require.NoError(t, err)

defer rc.Close()

got, err := io.ReadAll(rc)
require.NoError(t, err)
assert.Equal(t, data, got)
})
}