Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4081,7 +4081,7 @@ func zstdMutator(
return func(r *http.Request) {
zerolog.Ctx(ctx).
Debug().
Msg("narinfo compress is none will set Accept-Encoding to zstd")
Msg("narinfo compression is none will set Accept-Encoding to zstd")

r.Header.Set("Accept-Encoding", "zstd")

Expand Down
11 changes: 6 additions & 5 deletions pkg/cache/cache_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"testing"
"time"

"github.com/klauspost/compress/zstd"
"github.com/nix-community/go-nix/pkg/narinfo"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
Expand All @@ -26,6 +25,7 @@ import (
"github.com/kalbasit/ncps/pkg/database"
"github.com/kalbasit/ncps/pkg/nar"
"github.com/kalbasit/ncps/pkg/storage/local"
"github.com/kalbasit/ncps/pkg/zstd"
"github.com/kalbasit/ncps/testdata"
"github.com/kalbasit/ncps/testhelper"
)
Expand Down Expand Up @@ -311,13 +311,14 @@ func testRunLRU(factory cacheFactory) func(*testing.T) {
narNone := nar.CompressionTypeNone
for _, entry := range entries {
if entry.NarCompression == narNone {
encoder, _ := zstd.NewWriter(nil)
enc := zstd.GetWriter()
defer zstd.PutWriter(enc)

var compressed bytes.Buffer
encoder.Reset(&compressed)
_, err = encoder.Write([]byte(entry.NarText))
enc.Reset(&compressed)
_, err = enc.Write([]byte(entry.NarText))
require.NoError(t, err)
err = encoder.Close()
err = enc.Close()
require.NoError(t, err)

zstdSizes[entry.NarInfoHash] = uint64(compressed.Len()) //nolint:gosec
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"testing"
"time"

"github.com/klauspost/compress/zstd"
"github.com/nix-community/go-nix/pkg/narinfo"
"github.com/nix-community/go-nix/pkg/narinfo/signature"
"github.com/rs/zerolog"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/kalbasit/ncps/pkg/storage"
"github.com/kalbasit/ncps/pkg/storage/chunk"
"github.com/kalbasit/ncps/pkg/storage/local"
"github.com/kalbasit/ncps/pkg/zstd"
"github.com/kalbasit/ncps/testdata"
"github.com/kalbasit/ncps/testhelper"

Expand Down Expand Up @@ -648,10 +648,10 @@ func testGetNarInfo(factory cacheFactory) func(*testing.T) {
require.NoError(t, err)

if assert.NotEqual(t, narEntry.NarText, string(body), "narText should be stored compressed in the store") {
decoder, err := zstd.NewReader(nil)
require.NoError(t, err)
dec := zstd.GetReader()
defer zstd.PutReader(dec)

plain, err := decoder.DecodeAll(body, []byte{})
plain, err := dec.DecodeAll(body, []byte{})
require.NoError(t, err)

assert.Equal(t, narEntry.NarText, string(plain))
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func testCDCChunksAreCompressed(factory cacheFactory) func(*testing.T) {

// Use highly compressible data (repeated bytes)
content := strings.Repeat("compressible", 1000)
nu := nar.URL{Hash: "testnar-compress", Compression: nar.CompressionTypeNone}
nu := nar.URL{Hash: "testnar-zstd", Compression: nar.CompressionTypeNone}

r := io.NopCloser(strings.NewReader(content))
err = c.PutNar(ctx, nu, r)
Expand Down
15 changes: 9 additions & 6 deletions pkg/cache/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"strings"
"testing"

"github.com/klauspost/compress/zstd"
"github.com/nix-community/go-nix/pkg/narinfo"
"github.com/nix-community/go-nix/pkg/narinfo/signature"
"github.com/nix-community/go-nix/pkg/nixhash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/kalbasit/ncps/pkg/zstd"
)

// CheckAndFixNarInfo is a test-only export of the unexported checkAndFixNarInfo method.
Expand All @@ -33,13 +35,14 @@ func compressZstd(t *testing.T, data string) string {

var buf strings.Builder

enc, err := zstd.NewWriter(&buf)
require.NoError(t, err)
_, err = io.WriteString(enc, data)
require.NoError(t, err)
err = enc.Close()
pw := zstd.NewPooledWriter(&buf)

_, err := io.WriteString(pw, data)
require.NoError(t, err)

err = pw.Close()
assert.NoError(t, err) //nolint:testifylint

return buf.String()
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/nar/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"io"

"github.com/andybalholm/brotli"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
"github.com/sorairolake/lzip-go"
"github.com/ulikunitz/xz"

"github.com/kalbasit/ncps/pkg/zstd"
)

// ErrUnsupportedCompressionType is returned when an unsupported compression type is encountered.
Expand All @@ -31,12 +32,12 @@ func DecompressReader(r io.Reader, comp CompressionType) (io.ReadCloser, error)
return io.NopCloser(bzip2.NewReader(r)), nil

case CompressionTypeZstd:
zr, err := zstd.NewReader(r)
pr, err := zstd.NewPooledReader(r)
if err != nil {
return nil, fmt.Errorf("failed to create zstd reader: %w", err)
}

return zr.IOReadCloser(), nil
return pr, nil

case CompressionTypeLz4:
return io.NopCloser(lz4.NewReader(r)), nil
Expand Down
9 changes: 4 additions & 5 deletions pkg/nar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (
"testing"

"github.com/andybalholm/brotli"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4/v4"
"github.com/sorairolake/lzip-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ulikunitz/xz"

"github.com/kalbasit/ncps/pkg/nar"
"github.com/kalbasit/ncps/pkg/zstd"
)

func TestDecompressReader(t *testing.T) {
Expand Down Expand Up @@ -70,11 +70,10 @@ func TestDecompressReader(t *testing.T) {
getInput: func(t *testing.T) io.Reader {
var buf bytes.Buffer

zw, err := zstd.NewWriter(&buf)
pw := zstd.NewPooledWriter(&buf)
_, err := pw.Write(content)
require.NoError(t, err)
_, err = zw.Write(content)
require.NoError(t, err)
require.NoError(t, zw.Close())
require.NoError(t, pw.Close())

return &buf
},
Expand Down
23 changes: 4 additions & 19 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (
"runtime/debug"
"strconv"
"strings"
"sync"
"time"

"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/klauspost/compress/zstd"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/riandyrn/otelchi"
"github.com/rs/zerolog"
Expand All @@ -31,6 +29,7 @@ import (
"github.com/kalbasit/ncps/pkg/nar"
"github.com/kalbasit/ncps/pkg/narinfo"
"github.com/kalbasit/ncps/pkg/storage"
"github.com/kalbasit/ncps/pkg/zstd"
)

const (
Expand Down Expand Up @@ -60,17 +59,6 @@ var tracer trace.Tracer
//nolint:gochecknoglobals
var prometheusGatherer promclient.Gatherer

//nolint:gochecknoglobals
var zstdWriterPool = sync.Pool{
New: func() interface{} {
// Not providing any options will use the default compression level.
// The error is ignored as NewWriter(nil) with no options doesn't error.
enc, _ := zstd.NewWriter(nil)

return enc
},
}

//nolint:gochecknoinits
func init() {
tracer = otel.Tracer(otelPackageName)
Expand Down Expand Up @@ -591,16 +579,13 @@ func (s *Server) getNar(withBody bool) http.HandlerFunc {
var out io.Writer = w

if useZstd {
enc := zstdWriterPool.Get().(*zstd.Encoder)
enc.Reset(w)
out = enc
pw := zstd.NewPooledWriter(w)
out = pw

defer func() {
if err := enc.Close(); err != nil {
if err := pw.Close(); err != nil {
zerolog.Ctx(r.Context()).Error().Err(err).Msg("failed to close zstd writer")
}

zstdWriterPool.Put(enc)
}()
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"testing"
"time"

"github.com/klauspost/compress/zstd"
"github.com/nix-community/go-nix/pkg/narinfo"
"github.com/nix-community/go-nix/pkg/narinfo/signature"
"github.com/rs/zerolog"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/kalbasit/ncps/pkg/server"
"github.com/kalbasit/ncps/pkg/storage"
"github.com/kalbasit/ncps/pkg/storage/local"
"github.com/kalbasit/ncps/pkg/zstd"
"github.com/kalbasit/ncps/testdata"
"github.com/kalbasit/ncps/testhelper"
)
Expand Down Expand Up @@ -949,13 +949,13 @@ func TestGetNar_ZstdCompression(t *testing.T) {
assert.Equal(t, "application/x-nix-nar", resp.Header.Get("Content-Type"))
assert.Empty(t, resp.Header.Get("Content-Length"))

// 3. Decompress the body and verify content
dec, err := zstd.NewReader(resp.Body)
// 3. DecompressReader the body and verify content
pr, err := zstd.NewPooledReader(resp.Body)
require.NoError(t, err)

defer dec.Close()
defer pr.Close()

decompressed, err := io.ReadAll(dec)
decompressed, err := io.ReadAll(pr)
require.NoError(t, err)
assert.Equal(t, narData, string(decompressed))
}
Expand Down
41 changes: 13 additions & 28 deletions pkg/storage/chunk/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,52 +7,33 @@ import (
"os"
"path/filepath"

"github.com/klauspost/compress/zstd"
"github.com/kalbasit/ncps/pkg/zstd"
)

// localReadCloser wraps a zstd decoder and file to properly close both on Close().
// localReadCloser wraps a pooled zstd reader and file to properly close both on Close().
type localReadCloser struct {
*zstd.Decoder
*zstd.PooledReader
file *os.File
}

func (r *localReadCloser) Close() error {
r.Decoder.Close()
_ = r.PooledReader.Close()

return r.file.Close()
}

// localStore implements Store for local filesystem.
type localStore struct {
baseDir string
encoder *zstd.Encoder
decoder *zstd.Decoder
}

// NewLocalStore returns a new local chunk store.
func NewLocalStore(baseDir string) (Store, error) {
encoder, err := zstd.NewWriter(nil)
if err != nil {
return nil, fmt.Errorf("failed to create zstd encoder: %w", err)
}

decoder, err := zstd.NewReader(nil)
if err != nil {
encoder.Close()

return nil, fmt.Errorf("failed to create zstd decoder: %w", err)
}

s := &localStore{
baseDir: baseDir,
encoder: encoder,
decoder: decoder,
}
// Ensure base directory exists
if err := os.MkdirAll(s.storeDir(), 0o755); err != nil {
encoder.Close()
decoder.Close()

return nil, fmt.Errorf("failed to create chunk store directory: %w", err)
}

Expand Down Expand Up @@ -94,15 +75,15 @@ func (s *localStore) GetChunk(_ context.Context, hash string) (io.ReadCloser, er
return nil, err
}

// Create a new decoder for this specific file
decoder, err := zstd.NewReader(f)
// Use pooled reader instead of creating new instance
pr, err := zstd.NewPooledReader(f)
if err != nil {
f.Close()

return nil, fmt.Errorf("failed to create zstd decoder: %w", err)
return nil, fmt.Errorf("failed to create zstd reader: %w", err)
}

return &localReadCloser{decoder, f}, nil
return &localReadCloser{pr, f}, nil
}

func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool, int64, error) {
Expand All @@ -114,8 +95,12 @@ func (s *localStore) PutChunk(_ context.Context, hash string, data []byte) (bool
return false, 0, err
}

// Use pooled encoder
enc := zstd.GetWriter()
defer zstd.PutWriter(enc)

// Compress data with zstd
compressed := s.encoder.EncodeAll(data, nil)
compressed := enc.EncodeAll(data, nil)

// Write to temporary file first to ensure atomicity
tmpFile, err := os.CreateTemp(dir, "chunk-*")
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/chunk/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func TestLocalStore(t *testing.T) {

// Use highly compressible data (repeated bytes)
data := bytes.Repeat([]byte("compressible"), 1024)
isNew, compressedSize, err := store.PutChunk(ctx, "test-hash-compress-1", data)
isNew, compressedSize, err := store.PutChunk(ctx, testhelper.MustRandNarHash(), data)
require.NoError(t, err)
assert.True(t, isNew)
assert.Greater(t, int64(len(data)), compressedSize, "compressed size should be less than original")
Expand All @@ -175,10 +175,11 @@ func TestLocalStore(t *testing.T) {
t.Parallel()

data := []byte("hello, compressed world! hello, compressed world! hello, compressed world!")
_, _, err := store.PutChunk(ctx, "test-hash-roundtrip", data)
hash := testhelper.MustRandNarHash()
_, _, err := store.PutChunk(ctx, hash, data)
require.NoError(t, err)

rc, err := store.GetChunk(ctx, "test-hash-roundtrip")
rc, err := store.GetChunk(ctx, hash)
require.NoError(t, err)

defer rc.Close()
Expand Down
Loading