Skip to content

Commit 3a7aa69

Browse files
batching: added batcher
1 parent 5901505 commit 3a7aa69

File tree

3 files changed

+291
-0
lines changed

3 files changed

+291
-0
lines changed

batcher/batcher.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package batcher
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"time"
7+
)
8+
9+
// Batcher provides an API for accumulating items into a batch for processing.
10+
type Batcher interface {
11+
// Put adds items to the batcher.
12+
Put(interface{}) error
13+
14+
// Get retrieves a batch from the batcher. This call will block until
15+
// one of the conditions for a "complete" batch is reached.
16+
Get() ([]interface{}, error)
17+
18+
// Dispose will dispose of the batcher. Any calls to Put or Get
19+
// will return errors.
20+
Dispose()
21+
}
22+
23+
// ErrDisposed is the error returned for a disposed Batcher
24+
var ErrDisposed = errors.New("batcher: disposed")
25+
26+
// CalculateBytes evaluates the number of bytes in an item added to a Batcher.
27+
type CalculateBytes func(interface{}) uint
28+
29+
type basicBatcher struct {
30+
maxTime time.Duration
31+
maxItems uint
32+
maxBytes uint
33+
queueLen uint
34+
calculateBytes CalculateBytes
35+
disposed bool
36+
items []interface{}
37+
lock sync.RWMutex
38+
batchChan chan []interface{}
39+
availableBytes uint
40+
}
41+
42+
// New creates a new Batcher using the provided arguments.
43+
// Batch readiness can be determined in three ways:
44+
// - Maximum number of bytes per batch
45+
// - Maximum number of items per batch
46+
// - Maximum amount of time waiting for a batch
47+
// Values of zero for one of these fields indicate they should not be
48+
// taken into account when evaluating the readiness of a batch.
49+
func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate CalculateBytes) (Batcher, error) {
50+
if maxBytes > 0 && calculate == nil {
51+
return nil, errors.New("batcher: must provide CalculateBytes function")
52+
}
53+
54+
return &basicBatcher{
55+
maxTime: maxTime,
56+
maxItems: maxItems,
57+
maxBytes: maxBytes,
58+
queueLen: queueLen,
59+
calculateBytes: calculate,
60+
items: make([]interface{}, 0, maxItems),
61+
batchChan: make(chan []interface{}, queueLen),
62+
}, nil
63+
}
64+
65+
// Put adds items to the batcher.
66+
func (b *basicBatcher) Put(item interface{}) error {
67+
// TODO make this non-blocking and drop if batchChan is full?
68+
// TODO Should there be a length check here and return an error if that happens without trying to add?
69+
70+
b.lock.Lock()
71+
if b.disposed {
72+
b.lock.Unlock()
73+
return ErrDisposed
74+
}
75+
76+
b.items = append(b.items, item)
77+
b.availableBytes += b.calculateBytes(item)
78+
if b.ready() {
79+
b.batchChan <- b.items
80+
b.items = make([]interface{}, 0, b.maxItems)
81+
b.availableBytes = 0
82+
}
83+
84+
b.lock.Unlock()
85+
return nil
86+
}
87+
88+
// Get retrieves a batch from the batcher. This call will block until
89+
// one of the conditions for a "complete" batch is reached.
90+
func (b *basicBatcher) Get() ([]interface{}, error) {
91+
b.lock.RLock()
92+
if b.disposed {
93+
b.lock.RUnlock()
94+
return nil, ErrDisposed
95+
}
96+
97+
var timeout <-chan time.Time
98+
if b.maxTime > 0 {
99+
timeout = time.After(b.maxTime)
100+
}
101+
b.lock.RUnlock()
102+
103+
select {
104+
case items := <-b.batchChan:
105+
b.lock.RLock()
106+
if b.disposed {
107+
return nil, ErrDisposed
108+
}
109+
b.lock.RUnlock()
110+
return items, nil
111+
case <-timeout:
112+
b.lock.Lock()
113+
if b.disposed {
114+
b.lock.Unlock()
115+
return nil, ErrDisposed
116+
}
117+
items := b.items
118+
b.items = make([]interface{}, 0, b.maxItems)
119+
b.availableBytes = 0
120+
b.lock.Unlock()
121+
return items, nil
122+
}
123+
}
124+
125+
// Dispose will dispose of the batcher. Any calls to Put or Get
126+
// will return errors.
127+
func (b *basicBatcher) Dispose() {
128+
b.lock.Lock()
129+
b.disposed = true
130+
b.items = nil
131+
b.batchChan = nil
132+
b.lock.Unlock()
133+
}
134+
135+
func (b *basicBatcher) ready() bool {
136+
if b.maxItems != 0 && uint(len(b.items)) >= b.maxItems {
137+
return true
138+
}
139+
if b.maxBytes != 0 && b.availableBytes >= b.maxBytes {
140+
return true
141+
}
142+
return false
143+
}

batcher/batcher_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package batcher
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestNoCalculateBytes(t *testing.T) {
12+
_, err := New(0, 0, 100, 5, nil)
13+
assert.Error(t, err)
14+
}
15+
16+
func TestMaxItems(t *testing.T) {
17+
assert := assert.New(t)
18+
b, err := New(0, 100, 100000, 10, func(str interface{}) uint {
19+
return uint(len(str.(string)))
20+
})
21+
assert.Nil(err)
22+
23+
go func() {
24+
for i := 0; i < 1000; i++ {
25+
assert.Nil(b.Put("foo bar baz"))
26+
}
27+
}()
28+
29+
batch, err := b.Get()
30+
assert.Len(batch, 100)
31+
assert.Nil(err)
32+
}
33+
34+
func TestMaxBytes(t *testing.T) {
35+
assert := assert.New(t)
36+
b, err := New(0, 10000, 100, 10, func(str interface{}) uint {
37+
return uint(len(str.(string)))
38+
})
39+
assert.Nil(err)
40+
41+
go func() {
42+
for i := 0; i < 1000; i++ {
43+
b.Put("a")
44+
}
45+
}()
46+
47+
batch, err := b.Get()
48+
assert.Len(batch, 100)
49+
assert.Nil(err)
50+
}
51+
52+
func TestMaxTime(t *testing.T) {
53+
assert := assert.New(t)
54+
b, err := New(time.Millisecond*200, 100000, 100000, 10,
55+
func(str interface{}) uint {
56+
return uint(len(str.(string)))
57+
},
58+
)
59+
assert.Nil(err)
60+
61+
go func() {
62+
for i := 0; i < 10000; i++ {
63+
b.Put("a")
64+
time.Sleep(time.Millisecond)
65+
}
66+
}()
67+
68+
before := time.Now()
69+
batch, err := b.Get()
70+
assert.InDelta(200, time.Since(before).Seconds()*1000, 2)
71+
assert.True(len(batch) > 0)
72+
assert.Nil(err)
73+
}
74+
75+
func TestMultiConsumer(t *testing.T) {
76+
assert := assert.New(t)
77+
b, err := New(0, 100, 100000, 10, func(str interface{}) uint {
78+
return uint(len(str.(string)))
79+
})
80+
assert.Nil(err)
81+
82+
var wg sync.WaitGroup
83+
wg.Add(5)
84+
for i := 0; i < 5; i++ {
85+
go func() {
86+
batch, err := b.Get()
87+
assert.Len(batch, 100)
88+
assert.Nil(err)
89+
wg.Done()
90+
}()
91+
}
92+
93+
go func() {
94+
for i := 0; i < 500; i++ {
95+
b.Put("a")
96+
}
97+
}()
98+
99+
wg.Wait()
100+
}
101+
102+
func TestDispose(t *testing.T) {
103+
assert := assert.New(t)
104+
b, err := New(0, 100000, 100000, 10, func(str interface{}) uint {
105+
return uint(len(str.(string)))
106+
})
107+
assert.Nil(err)
108+
b.Put("a")
109+
wait := make(chan bool)
110+
go func() {
111+
_, err := b.Get()
112+
assert.Equal(ErrDisposed, err)
113+
wait <- true
114+
}()
115+
116+
b.Dispose()
117+
118+
assert.Equal(ErrDisposed, b.Put("a"))
119+
120+
<-wait
121+
}

mock/batcher.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package mock
2+
3+
import (
4+
"github.com/stretchr/testify/mock"
5+
)
6+
7+
type MockBatcher struct {
8+
mock.Mock
9+
PutChan chan bool
10+
}
11+
12+
func (m *MockBatcher) Put(items interface{}) error {
13+
args := m.Called(items)
14+
if m.PutChan != nil {
15+
m.PutChan <- true
16+
}
17+
return args.Error(0)
18+
}
19+
20+
func (m *MockBatcher) Get() ([]interface{}, error) {
21+
args := m.Called()
22+
return args.Get(0).([]interface{}), args.Error(1)
23+
}
24+
25+
func (m *MockBatcher) Dispose() {
26+
m.Called()
27+
}

0 commit comments

Comments
 (0)