Skip to content

Commit 4d50edb

Browse files
Merge pull request #92 from brianshannan-wf/batching
Batcher
2 parents 5901505 + 901bf3b commit 4d50edb

File tree

3 files changed

+286
-0
lines changed

3 files changed

+286
-0
lines changed

batcher/batcher.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package batcher
2+
3+
import (
4+
"errors"
5+
"sync"
6+
"time"
7+
)
8+
9+
// Batcher provides an API for accumulating items into a batch for processing.
10+
type Batcher interface {
11+
// Put adds items to the batcher.
12+
Put(interface{}) error
13+
14+
// Get retrieves a batch from the batcher. This call will block until
15+
// one of the conditions for a "complete" batch is reached.
16+
Get() ([]interface{}, error)
17+
18+
// Dispose will dispose of the batcher. Any calls to Put or Get
19+
// will return errors.
20+
Dispose()
21+
}
22+
23+
// ErrDisposed is the error returned for a disposed Batcher
24+
var ErrDisposed = errors.New("batcher: disposed")
25+
26+
// CalculateBytes evaluates the number of bytes in an item added to a Batcher.
27+
type CalculateBytes func(interface{}) uint
28+
29+
type basicBatcher struct {
30+
maxTime time.Duration
31+
maxItems uint
32+
maxBytes uint
33+
queueLen uint
34+
calculateBytes CalculateBytes
35+
disposed bool
36+
items []interface{}
37+
lock sync.RWMutex
38+
batchChan chan []interface{}
39+
availableBytes uint
40+
}
41+
42+
// New creates a new Batcher using the provided arguments.
43+
// Batch readiness can be determined in three ways:
44+
// - Maximum number of bytes per batch
45+
// - Maximum number of items per batch
46+
// - Maximum amount of time waiting for a batch
47+
// Values of zero for one of these fields indicate they should not be
48+
// taken into account when evaluating the readiness of a batch.
49+
func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate CalculateBytes) (Batcher, error) {
50+
if maxBytes > 0 && calculate == nil {
51+
return nil, errors.New("batcher: must provide CalculateBytes function")
52+
}
53+
54+
return &basicBatcher{
55+
maxTime: maxTime,
56+
maxItems: maxItems,
57+
maxBytes: maxBytes,
58+
queueLen: queueLen,
59+
calculateBytes: calculate,
60+
items: make([]interface{}, 0, maxItems),
61+
batchChan: make(chan []interface{}, queueLen),
62+
}, nil
63+
}
64+
65+
// Put adds items to the batcher.
66+
func (b *basicBatcher) Put(item interface{}) error {
67+
b.lock.Lock()
68+
if b.disposed {
69+
b.lock.Unlock()
70+
return ErrDisposed
71+
}
72+
73+
b.items = append(b.items, item)
74+
b.availableBytes += b.calculateBytes(item)
75+
if b.ready() {
76+
b.batchChan <- b.items
77+
b.items = make([]interface{}, 0, b.maxItems)
78+
b.availableBytes = 0
79+
}
80+
81+
b.lock.Unlock()
82+
return nil
83+
}
84+
85+
// Get retrieves a batch from the batcher. This call will block until
86+
// one of the conditions for a "complete" batch is reached.
87+
func (b *basicBatcher) Get() ([]interface{}, error) {
88+
b.lock.RLock()
89+
if b.disposed {
90+
b.lock.RUnlock()
91+
return nil, ErrDisposed
92+
}
93+
b.lock.RUnlock()
94+
95+
var timeout <-chan time.Time
96+
if b.maxTime > 0 {
97+
timeout = time.After(b.maxTime)
98+
}
99+
100+
select {
101+
case items, ok := <-b.batchChan:
102+
if !ok {
103+
return nil, ErrDisposed
104+
}
105+
return items, nil
106+
case <-timeout:
107+
b.lock.Lock()
108+
if b.disposed {
109+
b.lock.Unlock()
110+
return nil, ErrDisposed
111+
}
112+
items := b.items
113+
b.items = make([]interface{}, 0, b.maxItems)
114+
b.availableBytes = 0
115+
b.lock.Unlock()
116+
return items, nil
117+
}
118+
}
119+
120+
// Dispose will dispose of the batcher. Any calls to Put or Get
121+
// will return errors.
122+
func (b *basicBatcher) Dispose() {
123+
b.lock.Lock()
124+
b.disposed = true
125+
b.items = nil
126+
close(b.batchChan)
127+
b.lock.Unlock()
128+
}
129+
130+
func (b *basicBatcher) ready() bool {
131+
if b.maxItems != 0 && uint(len(b.items)) >= b.maxItems {
132+
return true
133+
}
134+
if b.maxBytes != 0 && b.availableBytes >= b.maxBytes {
135+
return true
136+
}
137+
return false
138+
}

batcher/batcher_test.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package batcher
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestNoCalculateBytes(t *testing.T) {
12+
_, err := New(0, 0, 100, 5, nil)
13+
assert.Error(t, err)
14+
}
15+
16+
func TestMaxItems(t *testing.T) {
17+
assert := assert.New(t)
18+
b, err := New(0, 100, 100000, 10, func(str interface{}) uint {
19+
return uint(len(str.(string)))
20+
})
21+
assert.Nil(err)
22+
23+
go func() {
24+
for i := 0; i < 1000; i++ {
25+
assert.Nil(b.Put("foo bar baz"))
26+
}
27+
}()
28+
29+
batch, err := b.Get()
30+
assert.Len(batch, 100)
31+
assert.Nil(err)
32+
}
33+
34+
func TestMaxBytes(t *testing.T) {
35+
assert := assert.New(t)
36+
b, err := New(0, 10000, 100, 10, func(str interface{}) uint {
37+
return uint(len(str.(string)))
38+
})
39+
assert.Nil(err)
40+
41+
go func() {
42+
for i := 0; i < 1000; i++ {
43+
b.Put("a")
44+
}
45+
}()
46+
47+
batch, err := b.Get()
48+
assert.Len(batch, 100)
49+
assert.Nil(err)
50+
}
51+
52+
func TestMaxTime(t *testing.T) {
53+
assert := assert.New(t)
54+
b, err := New(time.Millisecond*200, 100000, 100000, 10,
55+
func(str interface{}) uint {
56+
return uint(len(str.(string)))
57+
},
58+
)
59+
assert.Nil(err)
60+
61+
go func() {
62+
for i := 0; i < 10000; i++ {
63+
b.Put("a")
64+
time.Sleep(time.Millisecond)
65+
}
66+
}()
67+
68+
before := time.Now()
69+
batch, err := b.Get()
70+
assert.InDelta(200, time.Since(before).Seconds()*1000, 2)
71+
assert.True(len(batch) > 0)
72+
assert.Nil(err)
73+
}
74+
75+
func TestMultiConsumer(t *testing.T) {
76+
assert := assert.New(t)
77+
b, err := New(0, 100, 100000, 10, func(str interface{}) uint {
78+
return uint(len(str.(string)))
79+
})
80+
assert.Nil(err)
81+
82+
var wg sync.WaitGroup
83+
wg.Add(5)
84+
for i := 0; i < 5; i++ {
85+
go func() {
86+
batch, err := b.Get()
87+
assert.Len(batch, 100)
88+
assert.Nil(err)
89+
wg.Done()
90+
}()
91+
}
92+
93+
go func() {
94+
for i := 0; i < 500; i++ {
95+
b.Put("a")
96+
}
97+
}()
98+
99+
wg.Wait()
100+
}
101+
102+
func TestDispose(t *testing.T) {
103+
assert := assert.New(t)
104+
b, err := New(0, 100000, 100000, 10, func(str interface{}) uint {
105+
return uint(len(str.(string)))
106+
})
107+
assert.Nil(err)
108+
b.Put("a")
109+
wait := make(chan bool)
110+
go func() {
111+
_, err := b.Get()
112+
assert.Equal(ErrDisposed, err)
113+
wait <- true
114+
}()
115+
116+
b.Dispose()
117+
118+
assert.Equal(ErrDisposed, b.Put("a"))
119+
120+
<-wait
121+
}

mock/batcher.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package mock
2+
3+
import (
4+
"github.com/stretchr/testify/mock"
5+
)
6+
7+
type Batcher struct {
8+
mock.Mock
9+
PutChan chan bool
10+
}
11+
12+
func (m *Batcher) Put(items interface{}) error {
13+
args := m.Called(items)
14+
if m.PutChan != nil {
15+
m.PutChan <- true
16+
}
17+
return args.Error(0)
18+
}
19+
20+
func (m *Batcher) Get() ([]interface{}, error) {
21+
args := m.Called()
22+
return args.Get(0).([]interface{}), args.Error(1)
23+
}
24+
25+
func (m *Batcher) Dispose() {
26+
m.Called()
27+
}

0 commit comments

Comments
 (0)