Skip to content

Commit 22be312

Browse files
batcher_improvements: added flushing and some extra checks
1 parent 4d50edb commit 22be312

File tree

3 files changed

+149
-15
lines changed

3 files changed

+149
-15
lines changed

batcher/batcher.go

Lines changed: 61 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
Copyright 2015 Workiva, LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package batcher
218

319
import (
@@ -15,9 +31,15 @@ type Batcher interface {
1531
// one of the conditions for a "complete" batch is reached.
1632
Get() ([]interface{}, error)
1733

34+
// Flush forcibly completes the batch currently being built
35+
Flush() error
36+
1837
// Dispose will dispose of the batcher. Any calls to Put or Get
1938
// will return errors.
2039
Dispose()
40+
41+
// IsDisposed will determine if the batcher is disposed
42+
IsDisposed() bool
2143
}
2244

2345
// ErrDisposed is the error returned for a disposed Batcher
@@ -30,7 +52,6 @@ type basicBatcher struct {
3052
maxTime time.Duration
3153
maxItems uint
3254
maxBytes uint
33-
queueLen uint
3455
calculateBytes CalculateBytes
3556
disposed bool
3657
items []interface{}
@@ -55,7 +76,6 @@ func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate Cal
5576
maxTime: maxTime,
5677
maxItems: maxItems,
5778
maxBytes: maxBytes,
58-
queueLen: queueLen,
5979
calculateBytes: calculate,
6080
items: make([]interface{}, 0, maxItems),
6181
batchChan: make(chan []interface{}, queueLen),
@@ -71,11 +91,11 @@ func (b *basicBatcher) Put(item interface{}) error {
7191
}
7292

7393
b.items = append(b.items, item)
74-
b.availableBytes += b.calculateBytes(item)
94+
if b.calculateBytes != nil {
95+
b.availableBytes += b.calculateBytes(item)
96+
}
7597
if b.ready() {
76-
b.batchChan <- b.items
77-
b.items = make([]interface{}, 0, b.maxItems)
78-
b.availableBytes = 0
98+
b.flush()
7999
}
80100

81101
b.lock.Unlock()
@@ -85,12 +105,8 @@ func (b *basicBatcher) Put(item interface{}) error {
85105
// Get retrieves a batch from the batcher. This call will block until
86106
// one of the conditions for a "complete" batch is reached.
87107
func (b *basicBatcher) Get() ([]interface{}, error) {
88-
b.lock.RLock()
89-
if b.disposed {
90-
b.lock.RUnlock()
91-
return nil, ErrDisposed
92-
}
93-
b.lock.RUnlock()
108+
// Don't check disposed yet so any items remaining in the queue
109+
// will be returned properly.
94110

95111
var timeout <-chan time.Time
96112
if b.maxTime > 0 {
@@ -117,16 +133,49 @@ func (b *basicBatcher) Get() ([]interface{}, error) {
117133
}
118134
}
119135

136+
// Flush forcibly completes the batch currently being built
137+
func (b *basicBatcher) Flush() error {
138+
b.lock.Lock()
139+
if b.disposed {
140+
b.lock.Unlock()
141+
return ErrDisposed
142+
}
143+
b.flush()
144+
b.lock.Unlock()
145+
return nil
146+
}
147+
120148
// Dispose will dispose of the batcher. Any calls to Put or Get
121149
// will return errors.
122150
func (b *basicBatcher) Dispose() {
123151
b.lock.Lock()
152+
if b.disposed {
153+
b.lock.Unlock()
154+
return
155+
}
156+
b.flush()
124157
b.disposed = true
125158
b.items = nil
126159
close(b.batchChan)
127160
b.lock.Unlock()
128161
}
129162

163+
// IsDisposed will determine if the batcher is disposed
164+
func (b *basicBatcher) IsDisposed() bool {
165+
b.lock.RLock()
166+
disposed := b.disposed
167+
b.lock.RUnlock()
168+
return disposed
169+
}
170+
171+
// flush adds the batch currently being built to the queue of completed batches.
172+
// flush is not threadsafe, so should be synchronized externally.
173+
func (b *basicBatcher) flush() {
174+
b.batchChan <- b.items
175+
b.items = make([]interface{}, 0, b.maxItems)
176+
b.availableBytes = 0
177+
}
178+
130179
func (b *basicBatcher) ready() bool {
131180
if b.maxItems != 0 && uint(len(b.items)) >= b.maxItems {
132181
return true

batcher/batcher_test.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
Copyright 2015 Workiva, LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package batcher
218

319
import (
@@ -72,6 +88,25 @@ func TestMaxTime(t *testing.T) {
7288
assert.Nil(err)
7389
}
7490

91+
func TestFlush(t *testing.T) {
92+
assert := assert.New(t)
93+
b, err := New(0, 10, 10, 10, func(str interface{}) uint {
94+
return uint(len(str.(string)))
95+
})
96+
assert.Nil(err)
97+
b.Put("a")
98+
wait := make(chan bool)
99+
go func() {
100+
batch, err := b.Get()
101+
assert.Equal([]interface{}{"a"}, batch)
102+
assert.Nil(err)
103+
wait <- true
104+
}()
105+
106+
b.Flush()
107+
<-wait
108+
}
109+
75110
func TestMultiConsumer(t *testing.T) {
76111
assert := assert.New(t)
77112
b, err := New(0, 100, 100000, 10, func(str interface{}) uint {
@@ -101,21 +136,41 @@ func TestMultiConsumer(t *testing.T) {
101136

102137
func TestDispose(t *testing.T) {
103138
assert := assert.New(t)
104-
b, err := New(0, 100000, 100000, 10, func(str interface{}) uint {
139+
b, err := New(0, 2, 100000, 10, func(str interface{}) uint {
105140
return uint(len(str.(string)))
106141
})
107142
assert.Nil(err)
108143
b.Put("a")
144+
b.Put("b")
145+
b.Put("c")
109146
wait := make(chan bool)
110147
go func() {
111-
_, err := b.Get()
148+
batch1, err := b.Get()
149+
assert.Equal([]interface{}{"a", "b"}, batch1)
150+
assert.Nil(err)
151+
batch2, err := b.Get()
152+
assert.Equal([]interface{}{"c"}, batch2)
153+
assert.Nil(err)
154+
_, err = b.Get()
112155
assert.Equal(ErrDisposed, err)
113156
wait <- true
114157
}()
115158

116159
b.Dispose()
117160

118-
assert.Equal(ErrDisposed, b.Put("a"))
161+
assert.Equal(ErrDisposed, b.Put("d"))
162+
assert.Equal(ErrDisposed, b.Flush())
119163

120164
<-wait
121165
}
166+
167+
func TestIsDisposed(t *testing.T) {
168+
assert := assert.New(t)
169+
b, err := New(0, 10, 10, 10, func(str interface{}) uint {
170+
return uint(len(str.(string)))
171+
})
172+
assert.Nil(err)
173+
assert.False(b.IsDisposed())
174+
b.Dispose()
175+
assert.True(b.IsDisposed())
176+
}

mock/batcher.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,29 @@
1+
/*
2+
Copyright 2015 Workiva, LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package mock
218

319
import (
420
"github.com/stretchr/testify/mock"
21+
22+
"github.com/Workiva/go-datastructures/batcher"
523
)
624

25+
var _ batcher.Batcher = new(Batcher)
26+
727
type Batcher struct {
828
mock.Mock
929
PutChan chan bool
@@ -22,6 +42,16 @@ func (m *Batcher) Get() ([]interface{}, error) {
2242
return args.Get(0).([]interface{}), args.Error(1)
2343
}
2444

45+
func (m *Batcher) Flush() error {
46+
args := m.Called()
47+
return args.Error(0)
48+
}
49+
2550
func (m *Batcher) Dispose() {
2651
m.Called()
2752
}
53+
54+
func (m *Batcher) IsDisposed() bool {
55+
args := m.Called()
56+
return args.Bool(0)
57+
}

0 commit comments

Comments
 (0)