perf(dotc1z): pool zstd encoders and decoders to reduce allocations#622
perf(dotc1z): pool zstd encoders and decoders to reduce allocations#622
Conversation
Add sync.Pool for zstd.Encoder and zstd.Decoder instances to reduce allocation overhead during c1z file operations. Profiling showed zstd.ensureHist allocating 215 MB/min in temporal_sync due to creating new encoders per saveC1z call. Changes: - Add pool.go with encoder/decoder pool management - Modify saveC1z to use pooled encoders when concurrency matches default - Modify decoder to use pooled decoders when options match defaults - Add comprehensive tests and benchmarks Safety measures: - Pool only used when options match pool defaults - Encoders always closed on error (not returned to pool in bad state) - Reset(nil) called before returning to pool to release references - Decoder Reset errors handled gracefully Benchmark results show 20,785x reduction in bytes allocated per encode: - Pooled: 112 B/op, 2 allocs - New each time: 2,328,009 B/op, 30 allocs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
WalkthroughAdds an unexported sync.Pool for zstd encoders/decoders, integrates pooled acquisition/reset/return into decoder creation and C1Z write paths with fallbacks, and adds tests and benchmarks validating pooled behavior and concurrency. (≤50 words) Changes
Sequence Diagram(s)sequenceDiagram
participant Writer as C1Z Writer
participant Pool as EncoderPool
participant ZSTD as zstd.Encoder
participant File as Output File
Note over Writer,Pool: Encoder write path with pooling
Writer->>Pool: getEncoder()
alt pooled available
Pool-->>Writer: pooled Encoder
Writer->>ZSTD: Reset(writer=File)
alt Reset OK
ZSTD-->>Writer: ready
Writer->>File: write compressed blocks
Writer->>ZSTD: Close/Flush
ZSTD-->>Writer: closed
Writer->>Pool: putEncoder(encoder)
else Reset fails
ZSTD-->>Writer: Reset error
Writer->>ZSTD: Close (do not put to pool)
Writer->>Pool: fallback -> create new Encoder
end
else none -> create new
Pool-->>Writer: nil
Writer->>ZSTD: NewEncoder(...)
end
sequenceDiagram
participant Reader as C1Z Reader
participant Pool as DecoderPool
participant ZSTD as zstd.Decoder
participant File as Input File
Note over Reader,Pool: Decoder read path with pooling
Reader->>Pool: getDecoder()
alt pooled available
Pool-->>Reader: pooled Decoder
Reader->>ZSTD: Reset(reader=File)
alt Reset OK
ZSTD-->>Reader: ready
Reader->>File: read compressed blocks
Reader->>ZSTD: Close
ZSTD-->>Reader: closed
Reader->>Pool: putDecoder(decoder)
else Reset fails
ZSTD-->>Reader: Reset error
Reader->>ZSTD: Close (do not put to pool)
Reader->>Pool: fallback -> create new Decoder
end
else none -> create new
Pool-->>Reader: nil
Reader->>ZSTD: NewDecoder(...)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Tip 🧪 Unit Test Generation v2 is now available!We have significantly improved our unit test generation capabilities. To enable: Add this to your reviews:
finishing_touches:
unit_tests:
enabled: trueTry it out by using the Have feedback? Share your thoughts on our Discord thread! Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (4)
pkg/dotc1z/pool_test.go (4)
67-88: Usingrequirein goroutines can cause test hangs on failure.When
requirefails inside a goroutine, it callst.FailNow()which only terminates the current goroutine, not the test. The main goroutine'swg.Wait()may hang indefinitely if other goroutines don't complete.Consider using
assertinstead and checking for failures afterwg.Wait(), or use a channel/atomic to signal failures.🔎 Proposed fix using assert
t.Run("concurrent pool access", func(t *testing.T) { const numGoroutines = 10 const iterations = 100 var wg sync.WaitGroup + var failed atomic.Bool wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func(id int) { defer wg.Done() for j := 0; j < iterations; j++ { enc, _ := getEncoder() - require.NotNil(t, enc) + if enc == nil { + failed.Store(true) + return + } var buf bytes.Buffer enc.Reset(&buf) data := []byte("concurrent test data") _, err := enc.Write(data) - require.NoError(t, err) - require.NoError(t, enc.Close()) + if err != nil { + failed.Store(true) + return + } + if err := enc.Close(); err != nil { + failed.Store(true) + return + } putEncoder(enc) } }(i) } wg.Wait() + require.False(t, failed.Load(), "concurrent access failed") })
141-161: Same issue:requirein goroutines for decoder concurrent test.Same concern as the encoder concurrent test - using
requireinside goroutines can cause the test to hang if a failure occurs.
223-224: Add period at end of comment to satisfy godot linter.🔎 Proposed fix
// BenchmarkEncoderPoolAllocs measures allocations with and without pooling. -// Run with: go test -bench=BenchmarkEncoderPoolAllocs -benchmem +// Run with: go test -bench=BenchmarkEncoderPoolAllocs -benchmem. func BenchmarkEncoderPoolAllocs(b *testing.B) {
93-99: Helper function ignores errors silently.The
createCompressedDatahelper ignores all errors. While acceptable for test helpers with known-good inputs, consider adding atparameter to fail fast on unexpected errors.🔎 Proposed fix
- createCompressedData := func(data []byte) []byte { + createCompressedData := func(t *testing.T, data []byte) []byte { + t.Helper() var buf bytes.Buffer - enc, _ := zstd.NewWriter(&buf) - _, _ = enc.Write(data) - _ = enc.Close() + enc, err := zstd.NewWriter(&buf) + require.NoError(t, err) + _, err = enc.Write(data) + require.NoError(t, err) + require.NoError(t, enc.Close()) return buf.Bytes() }
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
pkg/dotc1z/decoder.gopkg/dotc1z/file.gopkg/dotc1z/pool.gopkg/dotc1z/pool_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-02T17:21:01.723Z
Learnt from: ggreer
Repo: ConductorOne/baton-sdk PR: 616
File: pkg/synccompactor/compactor_test.go:44-52
Timestamp: 2026-01-02T17:21:01.723Z
Learning: In tests that verify cleanup behavior (e.g., ensuring temporary artifacts are removed or directories are empty after an operation), treat cleanup failures as test failures by asserting on the cleanup call (e.g., require.NoError(t, err) or assert.NoError(t, err)). This ensures that the cleanup path is explicitly tested and any cleanup error fails the test, confirming correct behavior of the cleanup code.
Applied to files:
pkg/dotc1z/pool_test.go
🧬 Code graph analysis (3)
pkg/dotc1z/file.go (2)
pkg/dotc1z/c1file.go (1)
WithEncoderConcurrency(173-177)vendor/google.golang.org/grpc/rpc_util.go (1)
Errorf(973-975)
pkg/dotc1z/pool_test.go (2)
pkg/dotc1z/decoder.go (3)
NewDecoder(234-267)WithDecoderConcurrency(111-116)WithDecoderMaxMemory(78-89)pkg/dotc1z/c1file.go (1)
WithEncoderConcurrency(173-177)
pkg/dotc1z/pool.go (3)
pkg/dotc1z/c1file.go (1)
WithEncoderConcurrency(173-177)vendor/github.com/klauspost/compress/zstd/decoder.go (2)
Decoder(22-47)NewReader(87-123)pkg/dotc1z/decoder.go (2)
WithDecoderConcurrency(111-116)WithDecoderMaxMemory(78-89)
🪛 GitHub Check: go-lint
pkg/dotc1z/pool_test.go
[failure] 224-224:
Comment should end in a period (godot)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: go-test (1.25.2, windows-latest)
- GitHub Check: go-test (1.25.2, ubuntu-latest)
🔇 Additional comments (12)
pkg/dotc1z/decoder.go (2)
145-160: Pool integration logic looks correct.The pool usage condition properly checks that options match pool defaults before attempting reuse. The fallback path on
Resetfailure correctly returns the decoder to the pool and creates a new one.One minor observation: if
getDecoder()returnsnil(whenokis false because pool creation failed), the code correctly falls through to create a new decoder via the existing path.
220-231: Close() correctly distinguishes pooled vs non-pooled decoders.The logic properly returns pooled decoders for reuse and closes non-pooled ones. Setting
d.zd = nilafter handling prevents double-free issues.pkg/dotc1z/file.go (3)
179-182: Returning a closed encoder to the pool is correct behavior.After reviewing the zstd library, calling
Reset(nil)on a closed encoder is safe and documented as the proper way to prepare it for pooling. The encoder can be reused afterReset()is called with a new writer.
148-149: IgnoringResetreturn value is intentional for encoders.Unlike
zstd.Decoder.Reset()which returns an error,zstd.Encoder.Reset()has no return value in the klauspost/compress library, so this is correct.
163-166: Error path correctly closes encoder without returning to pool.On copy error, the encoder is closed to release resources but not returned to the pool since it may be in an inconsistent state. The underscore assignment for
Close()error is acceptable here since we're already returning the original error.pkg/dotc1z/pool_test.go (1)
164-221: Good round-trip tests validating end-to-end correctness.The tests properly exercise both single and multiple round-trips, verifying data integrity. The use of
t.TempDir()ensures cleanup. Based on learnings, the cleanup is automatically handled by the testing framework here.pkg/dotc1z/pool.go (6)
21-36: Encoder pool get/create logic is well-structured.The type assertion with nil check handles both empty pool and type mismatch. Creating with
nilwriter is correct sinceReset()will be called before use.
41-49:putEncodercorrectly resets before pooling.Calling
Reset(nil)releases the reference to the previous writer, preventing memory leaks. The nil check prevents panics.
58-74: Decoder pool creation matchesdecoder.godefaults.The options (
concurrency=1,lowmem=true,maxMemory=defaultDecoderMaxMemory) align with the pool eligibility check indecoder.golines 147.
78-88:putDecodercorrectly handles Reset failure.Unlike
putEncoder, decoders can fail onReset(nil)if in a bad state. Not returning failed decoders to the pool prevents reuse of corrupted instances.
1-88: Overall pool implementation is solid.The design correctly:
- Uses
sync.Poolfor thread-safe pooling- Matches pool eligibility to specific option configurations
- Handles creation failures gracefully
- Cleans up references before returning to pool
- Prevents corrupted instances from being pooled
The ~20,785× reduction in bytes allocated per encode operation (per PR benchmarks) demonstrates significant improvement.
14-16:pooledEncoderConcurrencyis evaluated once at package init.This captures
GOMAXPROCSat program startup. IfGOMAXPROCSis changed at runtime (e.g., viaruntime.GOMAXPROCS(n)), the pool won't match and encoders will be created fresh instead of being pulled from the pool. This is the intended behavior—the code compares the requested encoder concurrency against the pooled default and only reuses pooled encoders when they match.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI Agents
In @pkg/dotc1z/pool_test.go:
- Around line 193-220: The test currently calls decoder.Close() and f.Close()
without checking errors; update the cleanup to capture and assert their return
values (e.g., err := decoder.Close(); require.NoError(t, err) and err =
f.Close(); require.NoError(t, err)) after using NewDecoder/decoder and os.Open/f
so cleanup failures fail the test and surface pool/resource issues.
🧹 Nitpick comments (2)
pkg/dotc1z/pool_test.go (2)
93-99: Consider handling errors in the test helper for clarity.The helper silently ignores errors from
enc.Write()andenc.Close(). While failures would likely cause downstream test failures during decompression, explicit error handling (e.g., usingrequire.NoError) would make test failures clearer and fail fast if encoder setup goes wrong.🔎 Proposed fix for explicit error handling
createCompressedData := func(data []byte) []byte { var buf bytes.Buffer - enc, _ := zstd.NewWriter(&buf) - _, _ = enc.Write(data) - _ = enc.Close() + enc, err := zstd.NewWriter(&buf) + require.NoError(t, err) + _, err = enc.Write(data) + require.NoError(t, err) + err = enc.Close() + require.NoError(t, err) return buf.Bytes() }
244-264: Consider basic error validation in benchmarks for robustness.The "new_encoder_each_time" sub-benchmark ignores all error returns (file operations, writes, Close calls). While this is common in benchmarks to minimize overhead, at minimum checking file operation errors (lines 249-250) with
if err != nil { b.Fatal(err) }would ensure the benchmark is measuring valid operations.
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
pkg/dotc1z/pool_test.go
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-02T17:21:01.723Z
Learnt from: ggreer
Repo: ConductorOne/baton-sdk PR: 616
File: pkg/synccompactor/compactor_test.go:44-52
Timestamp: 2026-01-02T17:21:01.723Z
Learning: In tests that verify cleanup behavior (e.g., ensuring temporary artifacts are removed or directories are empty after an operation), treat cleanup failures as test failures by asserting on the cleanup call (e.g., require.NoError(t, err) or assert.NoError(t, err)). This ensures that the cleanup path is explicitly tested and any cleanup error fails the test, confirming correct behavior of the cleanup code.
Applied to files:
pkg/dotc1z/pool_test.go
🧬 Code graph analysis (1)
pkg/dotc1z/pool_test.go (2)
pkg/dotc1z/decoder.go (4)
NewDecoder(234-267)C1ZFileHeader(23-23)WithDecoderConcurrency(111-116)WithDecoderMaxMemory(78-89)pkg/dotc1z/c1file.go (1)
WithEncoderConcurrency(173-177)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: go-test (1.25.2, windows-latest)
🔇 Additional comments (4)
pkg/dotc1z/pool_test.go (4)
16-89: LGTM! Comprehensive encoder pool testing.The test coverage is thorough:
- Pool behavior correctly verified (cache miss on first call, hit on subsequent calls)
- Compression correctness validated via round-trip decompression
- Concurrent access properly tested with multiple goroutines
101-161: LGTM! Thorough decoder pool testing.The decoder pool tests mirror the encoder tests well:
- Pool behavior validated (first get from empty pool, second from populated pool)
- Decompression correctness verified
- Concurrent access properly exercised
267-299: LGTM! Well-designed allocation benchmark.The benchmark correctly:
- Warms up the pool before measurement
- Resets the timer to exclude setup overhead
- Isolates pure allocation costs by using
bytes.Bufferinstead of file I/O- Provides a clear comparison for the pooling benefit
301-346: LGTM! Decoder benchmark properly configured.The benchmark provides a fair comparison by ensuring the "new_decoder_each_time" path uses the same decoder options (concurrency=1, low memory, max memory) that the pool would use. This ensures accurate measurement of pooling benefits.
Fixed logic error where encoders/decoders were only returned to pool if they originally came FROM the pool. When the pool was empty, new instances were created but never added back, so the pool never grew. Changes: - file.go: Check settings compatibility, not origin, when returning encoder - decoder.go: Rename fromPool → poolCompatible, set based on settings - pool_test.go: Add regression tests to verify pool grows The bug was that `fromPool` tracked where the instance came from, but the return-to-pool decision should be based on whether settings are pool-compatible. Now pool-compatible instances are always returned regardless of whether the pool was empty when they were created. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@pkg/dotc1z/pool_test.go`:
- Around line 341-355: Update the benchmark code to check and fail on I/O and
codec errors instead of ignoring them: in the new_encoder_each_time loop handle
errors returned by os.Open(dbFile) and os.Create(c1zFile) and by zstd.NewWriter
(the enc creation) and call b.Fatal(err) on failure; also check errors from
enc.Write/enc.Flush/enc.Close where appropriate; similarly, in the new_each_time
encoder benchmark ensure the zstd.NewWriter error is checked and b.Fatal on
error; and in the decoder benchmarks verify os.Open, zstd.NewReader (or
NewDecoder) return values and call b.Fatal(err) on any error so dbF, outF, enc,
and decoder variables are never nil and the benchmark fails loudly on
setup/iteration errors.
- Around line 67-84: The goroutines in the concurrent subtests call require.*
which uses t.FailNow and only terminates the goroutine; change those to assert.*
(e.g., assert.NotNil, assert.NoError) so failures are reported without stopping
the test goroutine, specifically in the anonymous goroutine that uses
getEncoder(), enc.Reset(&buf), enc.Write(...), enc.Close() and putEncoder(enc);
update the same pattern in the other concurrent subtest (lines 141–157) or
alternatively capture errors into a channel/slice and assert them in the parent
goroutine after wg.Wait().
- Around line 93-98: The test helper createCompressedData currently ignores
errors from zstd.NewWriter, enc.Write and enc.Close; update it to handle and
propagate those errors (either by changing the signature to return ([]byte,
error) and returning the error up the call chain, or accept *testing.T and call
t.Fatalf on error), check and handle the error returned by zstd.NewWriter, check
the n/err from enc.Write, and check/return the error from enc.Close, and then
update all test call sites of createCompressedData accordingly so tests fail
with the real error instead of producing silent/empty compressed data.
Update benchmark code to check and fail on I/O and codec errors instead of ignoring them. This includes: - BenchmarkEncoderPoolAllocs: new_encoder_each_time - BenchmarkEncoderAllocationOnly: both pooled and new_each_time - BenchmarkDecoderPoolAllocs: both pooled_decoder and new_decoder_each_time Proper cleanup is performed before b.Fatal() calls to avoid resource leaks even when benchmarks fail. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace require.* calls with assert.* in goroutines spawned by concurrent subtests. Using require.* (which calls t.FailNow) in a goroutine only terminates that goroutine, not the test, leading to confusing behavior. Using assert.* properly reports failures while allowing the goroutine to exit gracefully via early return. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
|
||
| // encoderPool manages reusable zstd.Encoder instances to reduce allocation overhead. | ||
| // All pooled encoders are configured with GOMAXPROCS concurrency. | ||
| var encoderPool sync.Pool |
There was a problem hiding this comment.
This will be across all connectors in C1 ... which I guess is the point.
Summary
sync.Poolforzstd.Encoderandzstd.Decoderinstances to reduce allocation overheadzstd.ensureHistallocating 215 MB/min in temporal_sync due to creating new encoders persaveC1zcallChanges
pkg/dotc1z/pool.gopkg/dotc1z/pool_test.gopkg/dotc1z/file.gosaveC1zto use pooled encoderspkg/dotc1z/decoder.goSafety Measures
Reset(nil)called before returning to pool to release writer/reader referencesReset()errors handled gracefully (don't return bad decoders to pool)Bug Fix (d7c1e91)
Fixed logic error where pool never grew because instances were only returned if they originally came FROM the pool. When pool was empty, new instances were created but never added back.
Root cause:
fromPooltracked origin, but return-to-pool decision should be based on settings compatibility.Fix:
file.go: CheckencoderConcurrency == pooledEncoderConcurrencyinstead offromPooldecoder.go: RenamedfromPool→poolCompatible, set based on settings not originTestPoolGrowsFromSaveC1zandTestPoolGrowsFromDecoderBenchmark Results
20,785x reduction in bytes allocated per encode operation.
Production Profile Data (prod-usw2)
temporal_sync (60s profile)
zstd.(*fastBase).ensureHistzstd.(*Decoder).startStreamDecoder.func2zstd.encoderOptions.encoderzstd.(*blockEnc).initdotc1z.saveC1z→ 288.42 MB cumulative (encoder path)temporal_worker (62s profile)
zstd.(*Decoder).startStreamDecoder.func2zstd.(*Decoder).DecodeAllzstd.(*blockDec).decodeSequenceszstd.(*blockDec).resetprotozstd.Unmarshal→decompressValue(decoder path)Expected Impact
Test plan
dotc1ztests pass🤖 Generated with Claude Code