Skip to content

Commit e12c5e2

Browse files
Merge pull request #85 from Workiva/try_pools
Try pools
2 parents 9f6962b + d86095a commit e12c5e2

File tree

5 files changed

+148
-23
lines changed

5 files changed

+148
-23
lines changed

btree/palm/tree.go

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,21 @@ const (
3535
apply
3636
)
3737

38-
const multiThreadAt = 1000 // number of keys before we multithread lookups
38+
const multiThreadAt = 400 // number of keys before we multithread lookups
3939

4040
type keyBundle struct {
4141
key common.Comparator
4242
left, right *node
4343
}
4444

45+
func (kb *keyBundle) dispose(ptree *ptree) {
46+
if ptree.kbRing.Len() == ptree.kbRing.Cap() {
47+
return
48+
}
49+
kb.key, kb.left, kb.right = nil, nil, nil
50+
ptree.kbRing.Put(kb)
51+
}
52+
4553
type ptree struct {
4654
root *node
4755
_padding0 [8]uint64
@@ -54,6 +62,10 @@ type ptree struct {
5462
disposed uint64
5563
buffer1 [8]uint64
5664
running uint64
65+
_padding2 [8]uint64
66+
kbRing *queue.RingBuffer
67+
disposeChannel chan bool
68+
mpChannel chan map[*node][]*keyBundle
5769
}
5870

5971
func (ptree *ptree) checkAndRun(action action) {
@@ -110,6 +122,29 @@ func (ptree *ptree) init(bufferSize, ary uint64) {
110122
ptree.cache = make([]interface{}, 0, bufferSize)
111123
ptree.root = newNode(true, newKeys(ary), newNodes(ary))
112124
ptree.actions = queue.NewRingBuffer(ptree.bufferSize)
125+
ptree.kbRing = queue.NewRingBuffer(1024)
126+
for i := uint64(0); i < ptree.kbRing.Cap(); i++ {
127+
ptree.kbRing.Put(&keyBundle{})
128+
}
129+
ptree.disposeChannel = make(chan bool)
130+
ptree.mpChannel = make(chan map[*node][]*keyBundle, 1024)
131+
var wg sync.WaitGroup
132+
wg.Add(1)
133+
go ptree.disposer(&wg)
134+
wg.Wait()
135+
}
136+
137+
func (ptree *ptree) newKeyBundle(key common.Comparator) *keyBundle {
138+
if ptree.kbRing.Len() == 0 {
139+
return &keyBundle{key: key}
140+
}
141+
ifc, err := ptree.kbRing.Get()
142+
if err != nil {
143+
return nil
144+
}
145+
kb := ifc.(*keyBundle)
146+
kb.key = key
147+
return kb
113148
}
114149

115150
func (ptree *ptree) operationRunner(xns interfaces, threaded bool) {
@@ -153,12 +188,12 @@ func (ptree *ptree) fetchKeys(xns interfaces, inParallel bool) (map[*node][]*key
153188
switch action.operation() {
154189
case add:
155190
for i, n := range action.nodes() {
156-
writeOperations[n] = append(writeOperations[n], &keyBundle{key: action.keys()[i]})
191+
writeOperations[n] = append(writeOperations[n], ptree.newKeyBundle(action.keys()[i]))
157192
}
158193
toComplete = append(toComplete, action)
159194
case remove:
160195
for i, n := range action.nodes() {
161-
deleteOperations[n] = append(deleteOperations[n], &keyBundle{key: action.keys()[i]})
196+
deleteOperations[n] = append(deleteOperations[n], ptree.newKeyBundle(action.keys()[i]))
162197
}
163198
toComplete = append(toComplete, action)
164199
case get, apply:
@@ -188,6 +223,19 @@ func (ptree *ptree) apply(n *node, aa *applyAction) {
188223
}
189224
}
190225

226+
func (ptree *ptree) disposer(wg *sync.WaitGroup) {
227+
wg.Done()
228+
229+
for {
230+
select {
231+
case mp := <-ptree.mpChannel:
232+
ptree.cleanMap(mp)
233+
case <-ptree.disposeChannel:
234+
return
235+
}
236+
}
237+
}
238+
191239
func (ptree *ptree) fetchKeysInSerial(xns interfaces) {
192240
for _, ifc := range xns {
193241
action := ifc.(action)
@@ -340,6 +388,14 @@ func (ptree *ptree) applyNode(n *node, adds, deletes []*keyBundle) {
340388
}
341389
}
342390

391+
func (ptree *ptree) cleanMap(op map[*node][]*keyBundle) {
392+
for _, bundles := range op {
393+
for _, kb := range bundles {
394+
kb.dispose(ptree)
395+
}
396+
}
397+
}
398+
343399
func (ptree *ptree) recursiveMutate(adds, deletes map[*node][]*keyBundle, setRoot, inParallel bool) {
344400
if len(adds) == 0 && len(deletes) == 0 {
345401
return
@@ -413,12 +469,18 @@ func (ptree *ptree) recursiveMutate(adds, deletes map[*node][]*keyBundle, setRoo
413469
ptree.splitNode(n, parent, &nodes, &keys)
414470
write.Lock()
415471
for i, k := range keys {
416-
nextLayerWrite[parent] = append(nextLayerWrite[parent], &keyBundle{key: k, left: nodes[i*2], right: nodes[i*2+1]})
472+
kb := ptree.newKeyBundle(k)
473+
kb.left = nodes[i*2]
474+
kb.right = nodes[i*2+1]
475+
nextLayerWrite[parent] = append(nextLayerWrite[parent], kb)
417476
}
418477
write.Unlock()
419478
}
420479
})
421480

481+
ptree.mpChannel <- adds
482+
ptree.mpChannel <- deletes
483+
422484
ptree.recursiveMutate(nextLayerWrite, nextLayerDelete, setRoot, inParallel)
423485
}
424486

@@ -467,8 +529,12 @@ func (ptree *ptree) Query(start, stop common.Comparator) common.Comparators {
467529
// Dispose will clean up any resources used by this tree. This
468530
// must be called to prevent a memory leak.
469531
func (ptree *ptree) Dispose() {
532+
if atomic.LoadUint64(&ptree.disposed) == 1 {
533+
return
534+
}
470535
ptree.actions.Dispose()
471536
atomic.StoreUint64(&ptree.disposed, 1)
537+
close(ptree.disposeChannel)
472538
}
473539

474540
func (ptree *ptree) print(output *log.Logger) {

btree/palm/tree_test.go

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ func BenchmarkReadAndWrites(b *testing.B) {
437437
keys = append(keys, generateRandomKeys(numItems))
438438
}
439439

440-
tree := newTree(16, 8)
440+
tree := newTree(8, 8)
441441
b.ResetTimer()
442442

443443
for i := 0; i < b.N; i++ {
@@ -447,25 +447,27 @@ func BenchmarkReadAndWrites(b *testing.B) {
447447
}
448448

449449
func BenchmarkSimultaneousReadsAndWrites(b *testing.B) {
450-
numItems := 1000
450+
numItems := 10000
451451
numRoutines := 8
452-
keys := make([]common.Comparators, 0, numRoutines)
453-
for i := 0; i < numRoutines; i++ {
454-
keys = append(keys, generateRandomKeys(numItems))
452+
keys := generateRandomKeys(numItems)
453+
chunks := chunkKeys(keys, int64(numRoutines))
454+
455+
trees := make([]*ptree, 0, numItems)
456+
for i := 0; i < b.N; i++ {
457+
trees = append(trees, newTree(8, 8))
455458
}
456459

457-
tree := newTree(16, 8)
458460
var wg sync.WaitGroup
459461
b.ResetTimer()
460462

461463
for i := 0; i < b.N; i++ {
462464
wg.Add(numRoutines)
463465
for j := 0; j < numRoutines; j++ {
464-
go func(j int) {
465-
tree.Insert(keys[j]...)
466-
tree.Get(keys[j]...)
466+
go func(i, j int) {
467+
trees[i].Insert(chunks[j]...)
468+
trees[i].Get(chunks[j]...)
467469
wg.Done()
468-
}(j)
470+
}(i, j)
469471
}
470472

471473
wg.Wait()
@@ -475,12 +477,15 @@ func BenchmarkSimultaneousReadsAndWrites(b *testing.B) {
475477
func BenchmarkBulkAdd(b *testing.B) {
476478
numItems := 10000
477479
keys := generateRandomKeys(numItems)
480+
trees := make([]*ptree, 0, b.N)
481+
for i := 0; i < b.N; i++ {
482+
trees = append(trees, newTree(8, 8))
483+
}
478484

479485
b.ResetTimer()
480486

481487
for i := 0; i < b.N; i++ {
482-
tree := newTree(8, 8)
483-
tree.Insert(keys...)
488+
trees[i].Insert(keys...)
484489
}
485490
}
486491

@@ -515,7 +520,7 @@ func BenchmarkBulkAddToExisting(b *testing.B) {
515520
func BenchmarkGet(b *testing.B) {
516521
numItems := 10000
517522
keys := generateRandomKeys(numItems)
518-
tree := newTree(32, 8)
523+
tree := newTree(8, 8)
519524
tree.Insert(keys...)
520525

521526
b.ResetTimer()

btree/plus/btree_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package plus
1818

1919
import (
20+
"sync"
2021
"testing"
2122

2223
"github.com/stretchr/testify/assert"
@@ -394,3 +395,34 @@ func BenchmarkReadAndWrites(b *testing.B) {
394395
tree.Get(ks[i]...)
395396
}
396397
}
398+
399+
func BenchmarkSimultaneousReadsAndWrites(b *testing.B) {
400+
numItems := 10000
401+
numRoutines := 8
402+
keys := constructRandomMockKeys(numItems)
403+
chunks := chunkKeys(keys, int64(numRoutines))
404+
405+
trees := make([]*btree, 0, numItems)
406+
for i := 0; i < b.N; i++ {
407+
trees = append(trees, newBTree(8))
408+
}
409+
410+
var wg sync.WaitGroup
411+
var lock sync.Mutex
412+
b.ResetTimer()
413+
414+
for i := 0; i < b.N; i++ {
415+
wg.Add(numRoutines)
416+
for j := 0; j < numRoutines; j++ {
417+
go func(i, j int) {
418+
lock.Lock()
419+
trees[i].Insert(chunks[j]...)
420+
trees[i].Get(chunks[j]...)
421+
lock.Unlock()
422+
wg.Done()
423+
}(i, j)
424+
}
425+
426+
wg.Wait()
427+
}
428+
}

btree/plus/mock_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ limitations under the License.
1616

1717
package plus
1818

19+
func chunkKeys(ks keys, numParts int64) []keys {
20+
parts := make([]keys, numParts)
21+
for i := int64(0); i < numParts; i++ {
22+
parts[i] = ks[i*int64(len(ks))/numParts : (i+1)*int64(len(ks))/numParts]
23+
}
24+
return parts
25+
}
26+
1927
type mockKey struct {
2028
value int
2129
}

queue/ring.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ type nodes []*node
4949
// described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
5050
// with some minor additions.
5151
type RingBuffer struct {
52-
_buffer0 [8]uint64
52+
_padding0 [8]uint64
5353
queue uint64
54-
_buffer1 [8]uint64
54+
_padding1 [8]uint64
5555
dequeue uint64
56-
_buffer2 [8]uint64
56+
_padding2 [8]uint64
5757
mask, disposed uint64
58-
_buffer3 [8]uint64
58+
_padding3 [8]uint64
5959
nodes nodes
6060
}
6161

@@ -74,6 +74,7 @@ func (rb *RingBuffer) init(size uint64) {
7474
func (rb *RingBuffer) Put(item interface{}) error {
7575
var n *node
7676
pos := atomic.LoadUint64(&rb.queue)
77+
i := 0
7778
L:
7879
for {
7980
if atomic.LoadUint64(&rb.disposed) == 1 {
@@ -92,7 +93,13 @@ L:
9293
default:
9394
pos = atomic.LoadUint64(&rb.queue)
9495
}
95-
runtime.Gosched() // free up the cpu before the next iteration
96+
97+
if i == 10000 {
98+
runtime.Gosched() // free up the cpu before the next iteration
99+
i = 0
100+
} else {
101+
i++
102+
}
96103
}
97104

98105
n.data = item
@@ -107,6 +114,7 @@ L:
107114
func (rb *RingBuffer) Get() (interface{}, error) {
108115
var n *node
109116
pos := atomic.LoadUint64(&rb.dequeue)
117+
i := 0
110118
L:
111119
for {
112120
if atomic.LoadUint64(&rb.disposed) == 1 {
@@ -125,7 +133,13 @@ L:
125133
default:
126134
pos = atomic.LoadUint64(&rb.dequeue)
127135
}
128-
runtime.Gosched() // free up the cpu before the next iteration
136+
137+
if i == 10000 {
138+
runtime.Gosched() // free up the cpu before the next iteration
139+
i = 0
140+
} else {
141+
i++
142+
}
129143
}
130144
data := n.data
131145
n.data = nil

0 commit comments

Comments
 (0)