-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbufferpool.go
More file actions
151 lines (129 loc) · 4.5 KB
/
bufferpool.go
File metadata and controls
151 lines (129 loc) · 4.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Package anet provides network communication components.
package anet
import (
"sync"
)
// maxBufferSize is the maximum size of buffers that will be pooled.
// Larger buffers will be allocated but not pooled to prevent memory bloat.
const maxBufferSize = 64 * 1024 // 64KB
// globalBufferPool is a NUMA-aware wrapper around per-node buffer pools.
var globalBufferPool = newGlobalBufferPool()
// globalBufferPoolType manages buffer pools per NUMA node.
type globalBufferPoolType struct {
pools []*bufferPool // one pool per NUMA node
nodes int // number of NUMA nodes detected
}
// bufferPool manages a set of sync.Pool instances for different buffer sizes.
// This helps reduce memory allocations and GC pressure by reusing buffers.
type bufferPool struct {
pools []*sync.Pool // Array of pools for different size classes
}
// newGlobalBufferPool creates a NUMA-aware global buffer pool.
func newGlobalBufferPool() *globalBufferPoolType {
nodes := detectNUMANodes()
pools := make([]*bufferPool, nodes)
for i := 0; i < nodes; i++ {
pools[i] = newBufferPool()
}
return &globalBufferPoolType{pools: pools, nodes: nodes}
}
// detectNUMANodes returns the number of NUMA nodes on this system. Defaults to 1.
func detectNUMANodes() int {
return 1 // stub: real detection can be added via cgo or syscalls
}
// newBufferPool creates a new buffer pool with pre-allocated sync.Pool instances
// for common buffer sizes. This improves performance by reducing allocations
// for frequently used message sizes.
func newBufferPool() *bufferPool {
bp := &bufferPool{
pools: make([]*sync.Pool, 32), // Pool sizes from 32B to 64KB.
}
for i := range bp.pools {
size := 32 << uint(i) // 32, 64, 128, ..., 64KB.
if size > maxBufferSize {
break
}
bp.pools[i] = &sync.Pool{
New: func() any {
return make([]byte, size)
},
}
}
return bp
}
// GetBuffer retrieves a buffer from the pool that is at least size bytes.
// If no suitable buffer exists in the pool, a new one will be allocated.
// The returned buffer may be larger than requested but will be at least size bytes.
func GetBuffer(size int) []byte {
return globalBufferPool.getBuffer(size)
}
// PutBuffer returns a buffer to the pool for future reuse.
// Buffers larger than maxBufferSize are not pooled to prevent memory bloat.
// The buffer should not be accessed after being returned to the pool.
func PutBuffer(buf []byte) {
globalBufferPool.putBuffer(buf)
}
// getBuffer retrieves a buffer from the pool that is at least size bytes.
// If no suitable buffer exists in the pool, a new one will be allocated.
// The returned buffer may be larger than requested but will be at least size bytes.
func (bp *bufferPool) getBuffer(size int) []byte {
if size > maxBufferSize {
return make([]byte, size)
}
// Find the smallest pool that fits the size.
poolIdx := 0
poolSize := 32
for poolSize < size {
poolSize *= 2
poolIdx++
}
// retrieve buffer from pool and check type assertion.
obj := bp.pools[poolIdx].Get()
if buf, ok := obj.([]byte); ok {
// Ensure buffer length matches class and capacity is sufficient.
if cap(buf) >= poolSize {
if len(buf) != poolSize {
buf = buf[:poolSize]
}
return buf
}
// Incorrect capacity (shouldn't happen) — fall through to allocate.
}
// fallback allocation if buffer type is not as expected.
return make([]byte, poolSize)
}
// putBuffer returns a buffer to the pool for future reuse.
// Buffers larger than maxBufferSize are not pooled to prevent memory bloat.
// The buffer should not be accessed after being returned to the pool.
func (bp *bufferPool) putBuffer(buf []byte) {
// Base pooling decision and bucket on capacity to avoid mis-bucketing
// when callers reslice to smaller lengths.
if cap(buf) > maxBufferSize {
return // Don't pool large-capacity buffers.
}
// Find the correct pool based on capacity
poolIdx := 0
poolSize := 32
target := cap(buf)
for poolSize < target {
poolSize *= 2
poolIdx++
}
// Normalize slice length to the class size before putting back.
if cap(buf) >= poolSize {
if len(buf) != poolSize {
buf = buf[:poolSize]
}
//nolint:staticcheck // SA6002: passing buf by value is necessary for the pool.
bp.pools[poolIdx].Put(buf)
}
}
// getBuffer retrieves a buffer from the local NUMA node pool.
func (g *globalBufferPoolType) getBuffer(size int) []byte {
// for now, always use node 0
return g.pools[0].getBuffer(size)
}
// putBuffer returns a buffer to the local NUMA node pool.
func (g *globalBufferPoolType) putBuffer(buf []byte) {
g.pools[0].putBuffer(buf)
}