Skip to content

Commit d7124bd

Browse files
author
Steven.Osborne
committed
Drain the batch channel more intelligently. Add order comments
1 parent 0e10833 commit d7124bd

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

batcher/batcher.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package batcher
1919
import (
2020
"errors"
2121
"sync"
22+
"sync/atomic"
2223
"time"
2324
)
2425

@@ -59,6 +60,7 @@ type basicBatcher struct {
5960
lock sync.RWMutex
6061
batchChan chan []interface{}
6162
availableBytes uint
63+
waiting int32
6264
}
6365

6466
// New creates a new Batcher using the provided arguments.
@@ -85,6 +87,7 @@ func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate Cal
8587

8688
// Put adds items to the batcher. If Put is continually called without calls to
8789
// Get, an unbounded number of go-routines will be generated.
90+
// Note: there is no order guarantee for items entering/leaving the batcher.
8891
func (b *basicBatcher) Put(item interface{}) error {
8992
b.lock.Lock()
9093
if b.disposed {
@@ -108,6 +111,7 @@ func (b *basicBatcher) Put(item interface{}) error {
108111
// one of the conditions for a "complete" batch is reached. If Put is
109112
// continually called without calls to Get, an unbounded number of
110113
// go-routines will be generated.
114+
// Note: there is no order guarantee for items entering/leaving the batcher.
111115
func (b *basicBatcher) Get() ([]interface{}, error) {
112116
// Don't check disposed yet so any items remaining in the queue
113117
// will be returned properly.
@@ -163,15 +167,9 @@ func (b *basicBatcher) Dispose() {
163167
b.items = nil
164168

165169
// Drain the batch channel and all routines waiting to put on the channel
166-
DrainLoop:
167-
for {
168-
select {
169-
case <-b.batchChan:
170-
case <-time.After(5 * time.Millisecond):
171-
break DrainLoop
172-
}
170+
for atomic.LoadInt32(&b.waiting) > 0 {
171+
<-b.batchChan
173172
}
174-
175173
close(b.batchChan)
176174
b.lock.Unlock()
177175
}
@@ -193,7 +191,13 @@ func (b *basicBatcher) flush() {
193191
for i, val := range b.items {
194192
cpItems[i] = val
195193
}
196-
go func() { b.batchChan <- cpItems }()
194+
// Signal one more waiter for the batch channel
195+
atomic.AddInt32(&b.waiting, 1)
196+
// Don't block on the channel put
197+
go func() {
198+
b.batchChan <- cpItems
199+
atomic.AddInt32(&b.waiting, -1)
200+
}()
197201
b.items = make([]interface{}, 0, b.maxItems)
198202
b.availableBytes = 0
199203
}

batcher/batcher_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,19 +145,24 @@ func TestDispose(t *testing.T) {
145145
b.Put("b")
146146
b.Put("c")
147147

148+
possibleBatches := [][]interface{}{
149+
[]interface{}{"a", "b"},
150+
[]interface{}{"c"},
151+
}
148152
batch1, err := b.Get()
149-
assert.Equal([]interface{}{"a", "b"}, batch1)
153+
assert.Contains(possibleBatches, batch1)
150154
assert.Nil(err)
151155
batch2, err := b.Get()
152-
assert.Equal([]interface{}{"c"}, batch2)
156+
assert.Contains(possibleBatches, batch2)
153157
assert.Nil(err)
154158

159+
b.Put("d")
155160
b.Dispose()
156161

157162
_, err = b.Get()
158163
assert.Equal(ErrDisposed, err)
159164

160-
assert.Equal(ErrDisposed, b.Put("d"))
165+
assert.Equal(ErrDisposed, b.Put("e"))
161166
assert.Equal(ErrDisposed, b.Flush())
162167

163168
}

0 commit comments

Comments
 (0)