Skip to content
31 changes: 29 additions & 2 deletions pkg/dotc1z/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ type decoder struct {
f io.Reader
zd *zstd.Decoder

decodedBytes uint64
decodedBytes uint64
poolCompatible bool // true if zd has pool-compatible settings and should be returned to pool

initOnce sync.Once
headerCheckErr error
Expand All @@ -141,6 +142,24 @@ func (d *decoder) Read(p []byte) (int, error) {
maxMemSize = defaultDecoderMaxMemory
}

// Try to use a pooled decoder if options match the pool's defaults.
// Pool decoders use: concurrency=1, lowmem=true, maxMemory=defaultDecoderMaxMemory.
usePool := d.o.decoderConcurrency == 1 && maxMemSize == defaultDecoderMaxMemory
if usePool {
zd, _ := getDecoder()
if zd != nil {
if err := zd.Reset(d.f); err != nil {
// Reset failed, return decoder to pool and fall through to create new one.
putDecoder(zd)
} else {
d.zd = zd
d.poolCompatible = true // Mark for return to pool on Close()
return
}
}
}

// Non-default options or pool unavailable: create new decoder.
zstdOpts := []zstd.DOption{
zstd.WithDecoderLowmem(true), // uses lower memory, trading potentially more allocations
zstd.WithDecoderMaxMemory(maxMemSize), // sets limit on maximum memory used when decoding stream
Expand All @@ -158,6 +177,8 @@ func (d *decoder) Read(p []byte) (int, error) {
return
}
d.zd = zd
// If settings are pool-compatible, mark for return to pool on Close()
d.poolCompatible = usePool
})

// Check header
Expand Down Expand Up @@ -200,7 +221,13 @@ func (d *decoder) Read(p []byte) (int, error) {

func (d *decoder) Close() error {
if d.zd != nil {
d.zd.Close()
if d.poolCompatible {
// Return decoder to pool for reuse.
putDecoder(d.zd)
} else {
d.zd.Close()
}
d.zd = nil
}
return nil
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/dotc1z/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,27 +137,50 @@ func saveC1z(dbFilePath string, outputFilePath string, encoderConcurrency int) e
if encoderConcurrency == 0 {
encoderConcurrency = runtime.GOMAXPROCS(0)
}
c1z, err := zstd.NewWriter(outFile,
zstd.WithEncoderConcurrency(encoderConcurrency),
)
if err != nil {
return err

// Try to use a pooled encoder if concurrency matches the pool's default.
// This reduces allocation overhead for the common case.
var c1z *zstd.Encoder
if encoderConcurrency == pooledEncoderConcurrency {
c1z, _ = getEncoder()
}
if c1z != nil {
c1z.Reset(outFile)
} else {
// Non-default concurrency or pool returned nil: create new encoder.
var err error
c1z, err = zstd.NewWriter(outFile,
zstd.WithEncoderConcurrency(encoderConcurrency),
)
if err != nil {
return err
}
}

_, err = io.Copy(c1z, dbFile)
if err != nil {
// Always close encoder to release resources. Don't return to pool - may be in bad state.
_ = c1z.Close()
return err
}

err = c1z.Flush()
if err != nil {
_ = c1z.Close()
return fmt.Errorf("failed to flush c1z: %w", err)
}
err = c1z.Close()
if err != nil {
// Close failed, don't return to pool.
return fmt.Errorf("failed to close c1z: %w", err)
}

// Successfully finished - return encoder to pool if it has pool-compatible settings.
// This ensures the pool grows even when initially empty.
if encoderConcurrency == pooledEncoderConcurrency {
putEncoder(c1z)
}

err = outFile.Sync()
if err != nil {
return fmt.Errorf("failed to sync out file: %w", err)
Expand Down
88 changes: 88 additions & 0 deletions pkg/dotc1z/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package dotc1z

import (
"runtime"
"sync"

"github.com/klauspost/compress/zstd"
)

// encoderPool manages reusable zstd.Encoder instances to reduce allocation overhead.
// All pooled encoders are configured with GOMAXPROCS concurrency.
var encoderPool sync.Pool
Copy link
Contributor

@kans kans Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be across all connectors in C1 ... which I guess is the point.


// pooledEncoderConcurrency is the concurrency level used for pooled encoders.
// Set at package init to GOMAXPROCS to match the default behavior.
var pooledEncoderConcurrency = runtime.GOMAXPROCS(0)

// getEncoder retrieves a zstd encoder from the pool or creates a new one.
// The returned encoder is NOT bound to any writer - call Reset(w) before use.
// Returns the encoder and a boolean indicating if it came from the pool.
func getEncoder() (*zstd.Encoder, bool) {
if enc, ok := encoderPool.Get().(*zstd.Encoder); ok && enc != nil {
return enc, true
}

// Create new encoder with default concurrency.
// This should not fail with valid options, but handle it gracefully.
enc, err := zstd.NewWriter(nil,
zstd.WithEncoderConcurrency(pooledEncoderConcurrency),
)
if err != nil {
// Fallback: return nil and let caller create encoder with their options
return nil, false
}
return enc, false
}

// putEncoder returns a zstd encoder to the pool for reuse.
// The encoder is reset to release any reference to the previous writer.
// Encoders should be in a clean state (Close() called) before returning.
func putEncoder(enc *zstd.Encoder) {
if enc == nil {
return
}
// Reset to nil writer to release reference to previous output.
// This is safe even if the encoder was already closed.
enc.Reset(nil)
encoderPool.Put(enc)
}

// decoderPool manages reusable zstd.Decoder instances to reduce allocation overhead.
// All pooled decoders are configured with concurrency=1 (single-threaded) and low memory mode.
var decoderPool sync.Pool

// getDecoder retrieves a zstd decoder from the pool or creates a new one.
// The returned decoder is NOT bound to any reader - call Reset(r) before use.
// Returns the decoder and a boolean indicating if it came from the pool.
func getDecoder() (*zstd.Decoder, bool) {
if dec, ok := decoderPool.Get().(*zstd.Decoder); ok && dec != nil {
return dec, true
}

// Create new decoder with default options matching decoder.go defaults.
dec, err := zstd.NewReader(nil,
zstd.WithDecoderConcurrency(1),
zstd.WithDecoderLowmem(true),
zstd.WithDecoderMaxMemory(defaultDecoderMaxMemory),
)
if err != nil {
// Fallback: return nil and let caller create decoder with their options
return nil, false
}
return dec, false
}

// putDecoder returns a zstd decoder to the pool for reuse.
// The decoder is reset to release any reference to the previous reader.
func putDecoder(dec *zstd.Decoder) {
if dec == nil {
return
}
// Reset to nil reader to release reference to previous input.
// If Reset fails (bad state), don't return to pool.
if err := dec.Reset(nil); err != nil {
return
}
decoderPool.Put(dec)
}
Loading
Loading