From 7ab440b5d730a0f2a8ac2af09cfc3487be1ad27f Mon Sep 17 00:00:00 2001 From: arreyder Date: Tue, 6 Jan 2026 17:20:12 -0600 Subject: [PATCH 1/8] perf(dotc1z): pool zstd encoders and decoders to reduce allocations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add sync.Pool for zstd.Encoder and zstd.Decoder instances to reduce allocation overhead during c1z file operations. Profiling showed zstd.ensureHist allocating 215 MB/min in temporal_sync due to creating new encoders per saveC1z call. Changes: - Add pool.go with encoder/decoder pool management - Modify saveC1z to use pooled encoders when concurrency matches default - Modify decoder to use pooled decoders when options match defaults - Add comprehensive tests and benchmarks Safety measures: - Pool only used when options match pool defaults - Encoders always closed on error (not returned to pool in bad state) - Reset(nil) called before returning to pool to release references - Decoder Reset errors handled gracefully Benchmark results show 20,785x reduction in bytes allocated per encode: - Pooled: 112 B/op, 2 allocs - New each time: 2,328,009 B/op, 30 allocs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- pkg/dotc1z/decoder.go | 27 +++- pkg/dotc1z/file.go | 33 +++- pkg/dotc1z/pool.go | 88 ++++++++++ pkg/dotc1z/pool_test.go | 346 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 488 insertions(+), 6 deletions(-) create mode 100644 pkg/dotc1z/pool.go create mode 100644 pkg/dotc1z/pool_test.go diff --git a/pkg/dotc1z/decoder.go b/pkg/dotc1z/decoder.go index 5f3a4cd2c..0596ca7ed 100644 --- a/pkg/dotc1z/decoder.go +++ b/pkg/dotc1z/decoder.go @@ -121,6 +121,7 @@ type decoder struct { zd *zstd.Decoder decodedBytes uint64 + fromPool bool // true if zd came from the pool initOnce sync.Once headerCheckErr error @@ -141,6 +142,24 @@ func (d *decoder) Read(p []byte) (int, error) { maxMemSize = defaultDecoderMaxMemory } + // Try to use a pooled decoder if options match the pool's defaults. + // Pool decoders use: concurrency=1, lowmem=true, maxMemory=defaultDecoderMaxMemory. + usePool := d.o.decoderConcurrency == 1 && maxMemSize == defaultDecoderMaxMemory + if usePool { + zd, ok := getDecoder() + if zd != nil { + if err := zd.Reset(d.f); err != nil { + // Reset failed, return decoder to pool and fall through to create new one. + putDecoder(zd) + } else { + d.zd = zd + d.fromPool = ok + return + } + } + } + + // Non-default options or pool unavailable: create new decoder. zstdOpts := []zstd.DOption{ zstd.WithDecoderLowmem(true), // uses lower memory, trading potentially more allocations zstd.WithDecoderMaxMemory(maxMemSize), // sets limit on maximum memory used when decoding stream @@ -200,7 +219,13 @@ func (d *decoder) Read(p []byte) (int, error) { func (d *decoder) Close() error { if d.zd != nil { - d.zd.Close() + if d.fromPool { + // Return pooled decoder for reuse. + putDecoder(d.zd) + } else { + d.zd.Close() + } + d.zd = nil } return nil } diff --git a/pkg/dotc1z/file.go b/pkg/dotc1z/file.go index b7096ce6a..f5c4e5300 100644 --- a/pkg/dotc1z/file.go +++ b/pkg/dotc1z/file.go @@ -137,27 +137,50 @@ func saveC1z(dbFilePath string, outputFilePath string, encoderConcurrency int) e if encoderConcurrency == 0 { encoderConcurrency = runtime.GOMAXPROCS(0) } - c1z, err := zstd.NewWriter(outFile, - zstd.WithEncoderConcurrency(encoderConcurrency), - ) - if err != nil { - return err + + // Try to use a pooled encoder if concurrency matches the pool's default. + // This reduces allocation overhead for the common case. + var c1z *zstd.Encoder + var fromPool bool + if encoderConcurrency == pooledEncoderConcurrency { + c1z, fromPool = getEncoder() + } + if c1z != nil { + c1z.Reset(outFile) + } else { + // Non-default concurrency or pool returned nil: create new encoder. + var err error + c1z, err = zstd.NewWriter(outFile, + zstd.WithEncoderConcurrency(encoderConcurrency), + ) + if err != nil { + return err + } } _, err = io.Copy(c1z, dbFile) if err != nil { + // Always close encoder to release resources. Don't return to pool - may be in bad state. + _ = c1z.Close() return err } err = c1z.Flush() if err != nil { + _ = c1z.Close() return fmt.Errorf("failed to flush c1z: %w", err) } err = c1z.Close() if err != nil { + // Close failed, don't return to pool. return fmt.Errorf("failed to close c1z: %w", err) } + // Successfully finished - return encoder to pool if it came from there. + if fromPool { + putEncoder(c1z) + } + err = outFile.Sync() if err != nil { return fmt.Errorf("failed to sync out file: %w", err) diff --git a/pkg/dotc1z/pool.go b/pkg/dotc1z/pool.go new file mode 100644 index 000000000..fc413ce64 --- /dev/null +++ b/pkg/dotc1z/pool.go @@ -0,0 +1,88 @@ +package dotc1z + +import ( + "runtime" + "sync" + + "github.com/klauspost/compress/zstd" +) + +// encoderPool manages reusable zstd.Encoder instances to reduce allocation overhead. +// All pooled encoders are configured with GOMAXPROCS concurrency. +var encoderPool sync.Pool + +// pooledEncoderConcurrency is the concurrency level used for pooled encoders. +// Set at package init to GOMAXPROCS to match the default behavior. +var pooledEncoderConcurrency = runtime.GOMAXPROCS(0) + +// getEncoder retrieves a zstd encoder from the pool or creates a new one. +// The returned encoder is NOT bound to any writer - call Reset(w) before use. +// Returns the encoder and a boolean indicating if it came from the pool. +func getEncoder() (*zstd.Encoder, bool) { + if enc, ok := encoderPool.Get().(*zstd.Encoder); ok && enc != nil { + return enc, true + } + + // Create new encoder with default concurrency. + // This should not fail with valid options, but handle it gracefully. + enc, err := zstd.NewWriter(nil, + zstd.WithEncoderConcurrency(pooledEncoderConcurrency), + ) + if err != nil { + // Fallback: return nil and let caller create encoder with their options + return nil, false + } + return enc, false +} + +// putEncoder returns a zstd encoder to the pool for reuse. +// The encoder is reset to release any reference to the previous writer. +// Encoders should be in a clean state (Close() called) before returning. +func putEncoder(enc *zstd.Encoder) { + if enc == nil { + return + } + // Reset to nil writer to release reference to previous output. + // This is safe even if the encoder was already closed. + enc.Reset(nil) + encoderPool.Put(enc) +} + +// decoderPool manages reusable zstd.Decoder instances to reduce allocation overhead. +// All pooled decoders are configured with concurrency=1 (single-threaded) and low memory mode. +var decoderPool sync.Pool + +// getDecoder retrieves a zstd decoder from the pool or creates a new one. +// The returned decoder is NOT bound to any reader - call Reset(r) before use. +// Returns the decoder and a boolean indicating if it came from the pool. +func getDecoder() (*zstd.Decoder, bool) { + if dec, ok := decoderPool.Get().(*zstd.Decoder); ok && dec != nil { + return dec, true + } + + // Create new decoder with default options matching decoder.go defaults. + dec, err := zstd.NewReader(nil, + zstd.WithDecoderConcurrency(1), + zstd.WithDecoderLowmem(true), + zstd.WithDecoderMaxMemory(defaultDecoderMaxMemory), + ) + if err != nil { + // Fallback: return nil and let caller create decoder with their options + return nil, false + } + return dec, false +} + +// putDecoder returns a zstd decoder to the pool for reuse. +// The decoder is reset to release any reference to the previous reader. +func putDecoder(dec *zstd.Decoder) { + if dec == nil { + return + } + // Reset to nil reader to release reference to previous input. + // If Reset fails (bad state), don't return to pool. + if err := dec.Reset(nil); err != nil { + return + } + decoderPool.Put(dec) +} diff --git a/pkg/dotc1z/pool_test.go b/pkg/dotc1z/pool_test.go new file mode 100644 index 000000000..60e3ab1cf --- /dev/null +++ b/pkg/dotc1z/pool_test.go @@ -0,0 +1,346 @@ +package dotc1z + +import ( + "bytes" + "io" + "os" + "path/filepath" + "runtime" + "sync" + "testing" + + "github.com/klauspost/compress/zstd" + "github.com/stretchr/testify/require" +) + +func TestEncoderPool(t *testing.T) { + t.Run("get returns valid encoder", func(t *testing.T) { + enc, fromPool := getEncoder() + require.NotNil(t, enc) + // First call won't be from pool (pool is empty) + require.False(t, fromPool) + + // Return to pool and get again + putEncoder(enc) + + enc2, fromPool2 := getEncoder() + require.NotNil(t, enc2) + require.True(t, fromPool2) + putEncoder(enc2) + }) + + t.Run("pooled encoder produces correct output", func(t *testing.T) { + testData := []byte("test data for compression with pooled encoder") + + // Get encoder from pool + enc, _ := getEncoder() + require.NotNil(t, enc) + + var buf bytes.Buffer + enc.Reset(&buf) + + _, err := enc.Write(testData) + require.NoError(t, err) + + err = enc.Close() + require.NoError(t, err) + + putEncoder(enc) + + // Verify we can decompress + dec, err := zstd.NewReader(bytes.NewReader(buf.Bytes())) + require.NoError(t, err) + defer dec.Close() + + decoded, err := io.ReadAll(dec) + require.NoError(t, err) + require.Equal(t, testData, decoded) + }) + + t.Run("concurrent pool access", func(t *testing.T) { + const numGoroutines = 10 + const iterations = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < iterations; j++ { + enc, _ := getEncoder() + require.NotNil(t, enc) + + var buf bytes.Buffer + enc.Reset(&buf) + + data := []byte("concurrent test data") + _, err := enc.Write(data) + require.NoError(t, err) + require.NoError(t, enc.Close()) + + putEncoder(enc) + } + }(i) + } + + wg.Wait() + }) +} + +func TestDecoderPool(t *testing.T) { + // Create some test compressed data + createCompressedData := func(data []byte) []byte { + var buf bytes.Buffer + enc, _ := zstd.NewWriter(&buf) + _, _ = enc.Write(data) + _ = enc.Close() + return buf.Bytes() + } + + t.Run("get returns valid decoder", func(t *testing.T) { + dec, fromPool := getDecoder() + require.NotNil(t, dec) + require.False(t, fromPool) // First call, pool is empty + + putDecoder(dec) + + dec2, fromPool2 := getDecoder() + require.NotNil(t, dec2) + require.True(t, fromPool2) + putDecoder(dec2) + }) + + t.Run("pooled decoder produces correct output", func(t *testing.T) { + testData := []byte("test data for decompression with pooled decoder") + compressed := createCompressedData(testData) + + dec, _ := getDecoder() + require.NotNil(t, dec) + + err := dec.Reset(bytes.NewReader(compressed)) + require.NoError(t, err) + + decoded, err := io.ReadAll(dec) + require.NoError(t, err) + require.Equal(t, testData, decoded) + + putDecoder(dec) + }) + + t.Run("concurrent decoder pool access", func(t *testing.T) { + testData := []byte("concurrent decoder test data") + compressed := createCompressedData(testData) + + const numGoroutines = 10 + const iterations = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < iterations; j++ { + dec, _ := getDecoder() + require.NotNil(t, dec) + + err := dec.Reset(bytes.NewReader(compressed)) + require.NoError(t, err) + + decoded, err := io.ReadAll(dec) + require.NoError(t, err) + require.Equal(t, testData, decoded) + + putDecoder(dec) + } + }() + } + + wg.Wait() + }) +} + +func TestPooledRoundTrip(t *testing.T) { + t.Run("encode decode round trip with pooled codecs", func(t *testing.T) { + tmpDir := t.TempDir() + testData := bytes.Repeat([]byte("test data for round trip "), 1000) + + // Write test db file + dbFile := filepath.Join(tmpDir, "test.db") + err := os.WriteFile(dbFile, testData, 0600) + require.NoError(t, err) + + // Save using pooled encoder + c1zFile := filepath.Join(tmpDir, "test.c1z") + err = saveC1z(dbFile, c1zFile, 0) + require.NoError(t, err) + + // Load using pooled decoder + f, err := os.Open(c1zFile) + require.NoError(t, err) + defer f.Close() + + decoder, err := NewDecoder(f) + require.NoError(t, err) + defer decoder.Close() + + decoded, err := io.ReadAll(decoder) + require.NoError(t, err) + require.Equal(t, testData, decoded) + }) + + t.Run("multiple round trips reuse pool", func(t *testing.T) { + tmpDir := t.TempDir() + + for i := 0; i < 10; i++ { + testData := bytes.Repeat([]byte("iteration data "), 100*(i+1)) + + dbFile := filepath.Join(tmpDir, "test.db") + err := os.WriteFile(dbFile, testData, 0600) + require.NoError(t, err) + + c1zFile := filepath.Join(tmpDir, "test.c1z") + err = saveC1z(dbFile, c1zFile, 0) + require.NoError(t, err) + + f, err := os.Open(c1zFile) + require.NoError(t, err) + + decoder, err := NewDecoder(f) + require.NoError(t, err) + + decoded, err := io.ReadAll(decoder) + require.NoError(t, err) + require.Equal(t, testData, decoded) + + decoder.Close() + f.Close() + } + }) +} + +// BenchmarkEncoderPoolAllocs measures allocations with and without pooling. +// Run with: go test -bench=BenchmarkEncoderPoolAllocs -benchmem +func BenchmarkEncoderPoolAllocs(b *testing.B) { + testData := bytes.Repeat([]byte("benchmark data "), 1000) + tmpDir := b.TempDir() + + dbFile := filepath.Join(tmpDir, "bench.db") + err := os.WriteFile(dbFile, testData, 0600) + require.NoError(b, err) + + b.Run("pooled_encoder", func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + c1zFile := filepath.Join(tmpDir, "bench.c1z") + err := saveC1z(dbFile, c1zFile, 0) + if err != nil { + b.Fatal(err) + } + } + }) + + b.Run("new_encoder_each_time", func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + c1zFile := filepath.Join(tmpDir, "bench2.c1z") + + dbF, _ := os.Open(dbFile) + outF, _ := os.Create(c1zFile) + + _, _ = outF.Write(C1ZFileHeader) + + // Create new encoder each time (simulates old behavior) + enc, _ := zstd.NewWriter(outF, zstd.WithEncoderConcurrency(runtime.GOMAXPROCS(0))) + _, _ = io.Copy(enc, dbF) + _ = enc.Flush() + _ = enc.Close() + + _ = outF.Sync() + _ = outF.Close() + _ = dbF.Close() + } + }) +} + +// BenchmarkEncoderAllocationOnly isolates encoder allocation overhead. +// This shows the direct benefit of pooling without file I/O noise. +func BenchmarkEncoderAllocationOnly(b *testing.B) { + testData := []byte("small test data for encoder benchmark") + + b.Run("pooled", func(b *testing.B) { + // Warm up the pool + enc, _ := getEncoder() + putEncoder(enc) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + enc, _ := getEncoder() + var buf bytes.Buffer + enc.Reset(&buf) + _, _ = enc.Write(testData) + _ = enc.Close() + putEncoder(enc) + } + }) + + b.Run("new_each_time", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + var buf bytes.Buffer + enc, _ := zstd.NewWriter(&buf, zstd.WithEncoderConcurrency(runtime.GOMAXPROCS(0))) + _, _ = enc.Write(testData) + _ = enc.Close() + } + }) +} + +// BenchmarkDecoderPoolAllocs measures decoder allocations. +func BenchmarkDecoderPoolAllocs(b *testing.B) { + // Create test c1z file + tmpDir := b.TempDir() + testData := bytes.Repeat([]byte("benchmark data "), 1000) + + dbFile := filepath.Join(tmpDir, "bench.db") + err := os.WriteFile(dbFile, testData, 0600) + require.NoError(b, err) + + c1zFile := filepath.Join(tmpDir, "bench.c1z") + err = saveC1z(dbFile, c1zFile, 0) + require.NoError(b, err) + + b.Run("pooled_decoder", func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + f, _ := os.Open(c1zFile) + dec, _ := NewDecoder(f) + _, _ = io.ReadAll(dec) + dec.Close() + f.Close() + } + }) + + b.Run("new_decoder_each_time", func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + f, _ := os.Open(c1zFile) + + // Skip header manually + headerBuf := make([]byte, len(C1ZFileHeader)) + _, _ = f.Read(headerBuf) + + // Create new decoder each time (simulates old behavior) + dec, _ := zstd.NewReader(f, + zstd.WithDecoderConcurrency(1), + zstd.WithDecoderLowmem(true), + zstd.WithDecoderMaxMemory(defaultDecoderMaxMemory), + ) + _, _ = io.ReadAll(dec) + dec.Close() + f.Close() + } + }) +} From 164e413aebaff356eb64b8027ce0a317f1f9f4f5 Mon Sep 17 00:00:00 2001 From: arreyder Date: Tue, 6 Jan 2026 17:28:44 -0600 Subject: [PATCH 2/8] fix: add period to comment for godot linter --- pkg/dotc1z/pool_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dotc1z/pool_test.go b/pkg/dotc1z/pool_test.go index 60e3ab1cf..3027033ff 100644 --- a/pkg/dotc1z/pool_test.go +++ b/pkg/dotc1z/pool_test.go @@ -221,7 +221,7 @@ func TestPooledRoundTrip(t *testing.T) { } // BenchmarkEncoderPoolAllocs measures allocations with and without pooling. -// Run with: go test -bench=BenchmarkEncoderPoolAllocs -benchmem +// Run with: go test -bench=BenchmarkEncoderPoolAllocs -benchmem. func BenchmarkEncoderPoolAllocs(b *testing.B) { testData := bytes.Repeat([]byte("benchmark data "), 1000) tmpDir := b.TempDir() From d7c1e91cfa288300ddd6ab442773aa5ce49cc224 Mon Sep 17 00:00:00 2001 From: arreyder Date: Thu, 29 Jan 2026 13:04:31 -0600 Subject: [PATCH 3/8] fix(dotc1z): ensure pool grows when initially empty MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixed logic error where encoders/decoders were only returned to pool if they originally came FROM the pool. When the pool was empty, new instances were created but never added back, so the pool never grew. Changes: - file.go: Check settings compatibility, not origin, when returning encoder - decoder.go: Rename fromPool → poolCompatible, set based on settings - pool_test.go: Add regression tests to verify pool grows The bug was that `fromPool` tracked where the instance came from, but the return-to-pool decision should be based on whether settings are pool-compatible. Now pool-compatible instances are always returned regardless of whether the pool was empty when they were created. Co-Authored-By: Claude Opus 4.5 --- pkg/dotc1z/decoder.go | 14 ++++--- pkg/dotc1z/file.go | 8 ++-- pkg/dotc1z/pool_test.go | 92 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+), 10 deletions(-) diff --git a/pkg/dotc1z/decoder.go b/pkg/dotc1z/decoder.go index 0596ca7ed..fc8b750de 100644 --- a/pkg/dotc1z/decoder.go +++ b/pkg/dotc1z/decoder.go @@ -120,8 +120,8 @@ type decoder struct { f io.Reader zd *zstd.Decoder - decodedBytes uint64 - fromPool bool // true if zd came from the pool + decodedBytes uint64 + poolCompatible bool // true if zd has pool-compatible settings and should be returned to pool initOnce sync.Once headerCheckErr error @@ -146,14 +146,14 @@ func (d *decoder) Read(p []byte) (int, error) { // Pool decoders use: concurrency=1, lowmem=true, maxMemory=defaultDecoderMaxMemory. usePool := d.o.decoderConcurrency == 1 && maxMemSize == defaultDecoderMaxMemory if usePool { - zd, ok := getDecoder() + zd, _ := getDecoder() if zd != nil { if err := zd.Reset(d.f); err != nil { // Reset failed, return decoder to pool and fall through to create new one. putDecoder(zd) } else { d.zd = zd - d.fromPool = ok + d.poolCompatible = true // Mark for return to pool on Close() return } } @@ -177,6 +177,8 @@ func (d *decoder) Read(p []byte) (int, error) { return } d.zd = zd + // If settings are pool-compatible, mark for return to pool on Close() + d.poolCompatible = usePool }) // Check header @@ -219,8 +221,8 @@ func (d *decoder) Read(p []byte) (int, error) { func (d *decoder) Close() error { if d.zd != nil { - if d.fromPool { - // Return pooled decoder for reuse. + if d.poolCompatible { + // Return decoder to pool for reuse. putDecoder(d.zd) } else { d.zd.Close() diff --git a/pkg/dotc1z/file.go b/pkg/dotc1z/file.go index f5c4e5300..eaa83a1fe 100644 --- a/pkg/dotc1z/file.go +++ b/pkg/dotc1z/file.go @@ -141,9 +141,8 @@ func saveC1z(dbFilePath string, outputFilePath string, encoderConcurrency int) e // Try to use a pooled encoder if concurrency matches the pool's default. // This reduces allocation overhead for the common case. var c1z *zstd.Encoder - var fromPool bool if encoderConcurrency == pooledEncoderConcurrency { - c1z, fromPool = getEncoder() + c1z, _ = getEncoder() } if c1z != nil { c1z.Reset(outFile) @@ -176,8 +175,9 @@ func saveC1z(dbFilePath string, outputFilePath string, encoderConcurrency int) e return fmt.Errorf("failed to close c1z: %w", err) } - // Successfully finished - return encoder to pool if it came from there. - if fromPool { + // Successfully finished - return encoder to pool if it has pool-compatible settings. + // This ensures the pool grows even when initially empty. + if encoderConcurrency == pooledEncoderConcurrency { putEncoder(c1z) } diff --git a/pkg/dotc1z/pool_test.go b/pkg/dotc1z/pool_test.go index 3027033ff..5ed287a9e 100644 --- a/pkg/dotc1z/pool_test.go +++ b/pkg/dotc1z/pool_test.go @@ -161,6 +161,98 @@ func TestDecoderPool(t *testing.T) { }) } +// TestPoolGrowsFromSaveC1z verifies that saveC1z populates the encoder pool +// even when starting with an empty pool. This was a bug where only encoders +// that came FROM the pool were returned TO the pool. +func TestPoolGrowsFromSaveC1z(t *testing.T) { + // Clear any existing pool state by getting and not returning + for { + enc, fromPool := getEncoder() + if !fromPool { + // This was a fresh encoder, pool is now empty + // Don't return it - let it be GC'd + _ = enc.Close() + break + } + _ = enc.Close() // Don't return to pool + } + + // Verify pool is empty + enc, fromPool := getEncoder() + require.False(t, fromPool, "pool should be empty after draining") + _ = enc.Close() // Don't return + + // Now use saveC1z which should populate the pool + tmpDir := t.TempDir() + testData := bytes.Repeat([]byte("test data "), 100) + + dbFile := filepath.Join(tmpDir, "test.db") + err := os.WriteFile(dbFile, testData, 0600) + require.NoError(t, err) + + c1zFile := filepath.Join(tmpDir, "test.c1z") + err = saveC1z(dbFile, c1zFile, 0) + require.NoError(t, err) + + // Now the pool should have an encoder + enc2, fromPool2 := getEncoder() + require.True(t, fromPool2, "saveC1z should have returned encoder to pool") + putEncoder(enc2) +} + +// TestPoolGrowsFromDecoder verifies that NewDecoder populates the decoder pool +// even when starting with an empty pool. +func TestPoolGrowsFromDecoder(t *testing.T) { + // Clear any existing pool state + for { + dec, fromPool := getDecoder() + if !fromPool { + dec.Close() + break + } + dec.Close() // Don't return to pool + } + + // Verify pool is empty + dec, fromPool := getDecoder() + require.False(t, fromPool, "pool should be empty after draining") + dec.Close() + + // Create a c1z file to decode + tmpDir := t.TempDir() + testData := bytes.Repeat([]byte("test data "), 100) + + dbFile := filepath.Join(tmpDir, "test.db") + err := os.WriteFile(dbFile, testData, 0600) + require.NoError(t, err) + + c1zFile := filepath.Join(tmpDir, "test.c1z") + err = saveC1z(dbFile, c1zFile, 0) + require.NoError(t, err) + + // Drain encoder pool (saveC1z added one) + enc, _ := getEncoder() + _ = enc.Close() + + // Now use NewDecoder which should populate the decoder pool + f, err := os.Open(c1zFile) + require.NoError(t, err) + + decoder, err := NewDecoder(f) + require.NoError(t, err) + + _, err = io.ReadAll(decoder) + require.NoError(t, err) + + decoder.Close() + f.Close() + + // Now the decoder pool should have a decoder + dec2, fromPool2 := getDecoder() + require.True(t, fromPool2, "NewDecoder.Close should have returned decoder to pool") + putDecoder(dec2) +} + func TestPooledRoundTrip(t *testing.T) { t.Run("encode decode round trip with pooled codecs", func(t *testing.T) { tmpDir := t.TempDir() From 08caa6f10d688f2719ab1461fa8b230e675ca9da Mon Sep 17 00:00:00 2001 From: arreyder Date: Thu, 29 Jan 2026 13:14:10 -0600 Subject: [PATCH 4/8] fix(dotc1z): handle Close() return values for linter --- pkg/dotc1z/pool_test.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/dotc1z/pool_test.go b/pkg/dotc1z/pool_test.go index 5ed287a9e..4bd9d08f7 100644 --- a/pkg/dotc1z/pool_test.go +++ b/pkg/dotc1z/pool_test.go @@ -207,7 +207,7 @@ func TestPoolGrowsFromDecoder(t *testing.T) { for { dec, fromPool := getDecoder() if !fromPool { - dec.Close() + dec.Close() // zstd.Decoder.Close() returns nothing break } dec.Close() // Don't return to pool @@ -244,8 +244,10 @@ func TestPoolGrowsFromDecoder(t *testing.T) { _, err = io.ReadAll(decoder) require.NoError(t, err) - decoder.Close() - f.Close() + err = decoder.Close() + require.NoError(t, err) + err = f.Close() + require.NoError(t, err) // Now the decoder pool should have a decoder dec2, fromPool2 := getDecoder() @@ -306,8 +308,8 @@ func TestPooledRoundTrip(t *testing.T) { require.NoError(t, err) require.Equal(t, testData, decoded) - decoder.Close() - f.Close() + _ = decoder.Close() + _ = f.Close() } }) } @@ -410,8 +412,8 @@ func BenchmarkDecoderPoolAllocs(b *testing.B) { f, _ := os.Open(c1zFile) dec, _ := NewDecoder(f) _, _ = io.ReadAll(dec) - dec.Close() - f.Close() + _ = dec.Close() + _ = f.Close() } }) @@ -432,7 +434,7 @@ func BenchmarkDecoderPoolAllocs(b *testing.B) { ) _, _ = io.ReadAll(dec) dec.Close() - f.Close() + _ = f.Close() } }) } From 5a1503ad14e0e25ac782856f51101614955e19ba Mon Sep 17 00:00:00 2001 From: arreyder Date: Thu, 29 Jan 2026 13:16:41 -0600 Subject: [PATCH 5/8] fix(dotc1z): check Close() errors in TestPooledRoundTrip --- pkg/dotc1z/pool_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/dotc1z/pool_test.go b/pkg/dotc1z/pool_test.go index 4bd9d08f7..89cad3752 100644 --- a/pkg/dotc1z/pool_test.go +++ b/pkg/dotc1z/pool_test.go @@ -308,8 +308,10 @@ func TestPooledRoundTrip(t *testing.T) { require.NoError(t, err) require.Equal(t, testData, decoded) - _ = decoder.Close() - _ = f.Close() + err = decoder.Close() + require.NoError(t, err) + err = f.Close() + require.NoError(t, err) } }) } From b93a6a8ff75a1d51c0b577b88d3f0f2f1f65bf6f Mon Sep 17 00:00:00 2001 From: arreyder Date: Thu, 29 Jan 2026 13:17:53 -0600 Subject: [PATCH 6/8] fix(dotc1z): handle errors in createCompressedData test helper --- pkg/dotc1z/pool_test.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/dotc1z/pool_test.go b/pkg/dotc1z/pool_test.go index 89cad3752..ace75a9f5 100644 --- a/pkg/dotc1z/pool_test.go +++ b/pkg/dotc1z/pool_test.go @@ -90,11 +90,23 @@ func TestEncoderPool(t *testing.T) { func TestDecoderPool(t *testing.T) { // Create some test compressed data - createCompressedData := func(data []byte) []byte { + createCompressedData := func(t *testing.T, data []byte) []byte { + t.Helper() var buf bytes.Buffer - enc, _ := zstd.NewWriter(&buf) - _, _ = enc.Write(data) - _ = enc.Close() + enc, err := zstd.NewWriter(&buf) + if err != nil { + t.Fatalf("failed to create zstd writer: %v", err) + } + n, err := enc.Write(data) + if err != nil { + t.Fatalf("failed to write data: %v", err) + } + if n != len(data) { + t.Fatalf("short write: wrote %d of %d bytes", n, len(data)) + } + if err := enc.Close(); err != nil { + t.Fatalf("failed to close encoder: %v", err) + } return buf.Bytes() } @@ -113,7 +125,7 @@ func TestDecoderPool(t *testing.T) { t.Run("pooled decoder produces correct output", func(t *testing.T) { testData := []byte("test data for decompression with pooled decoder") - compressed := createCompressedData(testData) + compressed := createCompressedData(t, testData) dec, _ := getDecoder() require.NotNil(t, dec) @@ -130,7 +142,7 @@ func TestDecoderPool(t *testing.T) { t.Run("concurrent decoder pool access", func(t *testing.T) { testData := []byte("concurrent decoder test data") - compressed := createCompressedData(testData) + compressed := createCompressedData(t, testData) const numGoroutines = 10 const iterations = 100 From cf1b39ded39edfa7ec96e937f5ab7a32ef956705 Mon Sep 17 00:00:00 2001 From: arreyder Date: Thu, 29 Jan 2026 13:21:28 -0600 Subject: [PATCH 7/8] fix(pool_test): add proper error handling in all benchmarks Update benchmark code to check and fail on I/O and codec errors instead of ignoring them. This includes: - BenchmarkEncoderPoolAllocs: new_encoder_each_time - BenchmarkEncoderAllocationOnly: both pooled and new_each_time - BenchmarkDecoderPoolAllocs: both pooled_decoder and new_decoder_each_time Proper cleanup is performed before b.Fatal() calls to avoid resource leaks even when benchmarks fail. Co-Authored-By: Claude Opus 4.5 --- pkg/dotc1z/pool_test.go | 127 ++++++++++++++++++++++++++++++++-------- 1 file changed, 102 insertions(+), 25 deletions(-) diff --git a/pkg/dotc1z/pool_test.go b/pkg/dotc1z/pool_test.go index ace75a9f5..d22816230 100644 --- a/pkg/dotc1z/pool_test.go +++ b/pkg/dotc1z/pool_test.go @@ -354,20 +354,55 @@ func BenchmarkEncoderPoolAllocs(b *testing.B) { for i := 0; i < b.N; i++ { c1zFile := filepath.Join(tmpDir, "bench2.c1z") - dbF, _ := os.Open(dbFile) - outF, _ := os.Create(c1zFile) + dbF, err := os.Open(dbFile) + if err != nil { + b.Fatal(err) + } + outF, err := os.Create(c1zFile) + if err != nil { + dbF.Close() + b.Fatal(err) + } - _, _ = outF.Write(C1ZFileHeader) + if _, err := outF.Write(C1ZFileHeader); err != nil { + outF.Close() + dbF.Close() + b.Fatal(err) + } // Create new encoder each time (simulates old behavior) - enc, _ := zstd.NewWriter(outF, zstd.WithEncoderConcurrency(runtime.GOMAXPROCS(0))) - _, _ = io.Copy(enc, dbF) - _ = enc.Flush() - _ = enc.Close() + enc, err := zstd.NewWriter(outF, zstd.WithEncoderConcurrency(runtime.GOMAXPROCS(0))) + if err != nil { + outF.Close() + dbF.Close() + b.Fatal(err) + } + if _, err := io.Copy(enc, dbF); err != nil { + enc.Close() + outF.Close() + dbF.Close() + b.Fatal(err) + } + if err := enc.Flush(); err != nil { + enc.Close() + outF.Close() + dbF.Close() + b.Fatal(err) + } + enc.Close() - _ = outF.Sync() - _ = outF.Close() - _ = dbF.Close() + if err := outF.Sync(); err != nil { + outF.Close() + dbF.Close() + b.Fatal(err) + } + if err := outF.Close(); err != nil { + dbF.Close() + b.Fatal(err) + } + if err := dbF.Close(); err != nil { + b.Fatal(err) + } } }) } @@ -386,10 +421,15 @@ func BenchmarkEncoderAllocationOnly(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { enc, _ := getEncoder() + if enc == nil { + b.Fatal("getEncoder returned nil") + } var buf bytes.Buffer enc.Reset(&buf) - _, _ = enc.Write(testData) - _ = enc.Close() + if _, err := enc.Write(testData); err != nil { + b.Fatal(err) + } + enc.Close() putEncoder(enc) } }) @@ -399,9 +439,14 @@ func BenchmarkEncoderAllocationOnly(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { var buf bytes.Buffer - enc, _ := zstd.NewWriter(&buf, zstd.WithEncoderConcurrency(runtime.GOMAXPROCS(0))) - _, _ = enc.Write(testData) - _ = enc.Close() + enc, err := zstd.NewWriter(&buf, zstd.WithEncoderConcurrency(runtime.GOMAXPROCS(0))) + if err != nil { + b.Fatal(err) + } + if _, err := enc.Write(testData); err != nil { + b.Fatal(err) + } + enc.Close() } }) } @@ -423,32 +468,64 @@ func BenchmarkDecoderPoolAllocs(b *testing.B) { b.Run("pooled_decoder", func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - f, _ := os.Open(c1zFile) - dec, _ := NewDecoder(f) - _, _ = io.ReadAll(dec) - _ = dec.Close() - _ = f.Close() + f, err := os.Open(c1zFile) + if err != nil { + b.Fatal(err) + } + dec, err := NewDecoder(f) + if err != nil { + f.Close() + b.Fatal(err) + } + if _, err := io.ReadAll(dec); err != nil { + dec.Close() + f.Close() + b.Fatal(err) + } + if err := dec.Close(); err != nil { + f.Close() + b.Fatal(err) + } + if err := f.Close(); err != nil { + b.Fatal(err) + } } }) b.Run("new_decoder_each_time", func(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - f, _ := os.Open(c1zFile) + f, err := os.Open(c1zFile) + if err != nil { + b.Fatal(err) + } // Skip header manually headerBuf := make([]byte, len(C1ZFileHeader)) - _, _ = f.Read(headerBuf) + if _, err := f.Read(headerBuf); err != nil { + f.Close() + b.Fatal(err) + } // Create new decoder each time (simulates old behavior) - dec, _ := zstd.NewReader(f, + dec, err := zstd.NewReader(f, zstd.WithDecoderConcurrency(1), zstd.WithDecoderLowmem(true), zstd.WithDecoderMaxMemory(defaultDecoderMaxMemory), ) - _, _ = io.ReadAll(dec) + if err != nil { + f.Close() + b.Fatal(err) + } + if _, err := io.ReadAll(dec); err != nil { + dec.Close() + f.Close() + b.Fatal(err) + } dec.Close() - _ = f.Close() + if err := f.Close(); err != nil { + b.Fatal(err) + } } }) } From cef31914ed526d489482eafedb50a2a5f4298e5f Mon Sep 17 00:00:00 2001 From: arreyder Date: Thu, 29 Jan 2026 13:32:23 -0600 Subject: [PATCH 8/8] fix(pool_test): use assert.* in concurrent test goroutines Replace require.* calls with assert.* in goroutines spawned by concurrent subtests. Using require.* (which calls t.FailNow) in a goroutine only terminates that goroutine, not the test, leading to confusing behavior. Using assert.* properly reports failures while allowing the goroutine to exit gracefully via early return. Co-Authored-By: Claude Opus 4.5 --- pkg/dotc1z/pool_test.go | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/pkg/dotc1z/pool_test.go b/pkg/dotc1z/pool_test.go index d22816230..9fe1cbe12 100644 --- a/pkg/dotc1z/pool_test.go +++ b/pkg/dotc1z/pool_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/klauspost/compress/zstd" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -69,15 +70,21 @@ func TestEncoderPool(t *testing.T) { defer wg.Done() for j := 0; j < iterations; j++ { enc, _ := getEncoder() - require.NotNil(t, enc) + if !assert.NotNil(t, enc) { + return + } var buf bytes.Buffer enc.Reset(&buf) data := []byte("concurrent test data") _, err := enc.Write(data) - require.NoError(t, err) - require.NoError(t, enc.Close()) + if !assert.NoError(t, err) { + return + } + if !assert.NoError(t, enc.Close()) { + return + } putEncoder(enc) } @@ -155,14 +162,22 @@ func TestDecoderPool(t *testing.T) { defer wg.Done() for j := 0; j < iterations; j++ { dec, _ := getDecoder() - require.NotNil(t, dec) + if !assert.NotNil(t, dec) { + return + } err := dec.Reset(bytes.NewReader(compressed)) - require.NoError(t, err) + if !assert.NoError(t, err) { + return + } decoded, err := io.ReadAll(dec) - require.NoError(t, err) - require.Equal(t, testData, decoded) + if !assert.NoError(t, err) { + return + } + if !assert.Equal(t, testData, decoded) { + return + } putDecoder(dec) }