Skip to content

Commit 630fae4

Browse files
committed
Merge pull request #95 from brianshannan-wf/batcher_improvements
Batcher: Flushing
2 parents 7fba6c8 + 15b7d33 commit 630fae4

File tree

3 files changed

+155
-19
lines changed

3 files changed

+155
-19
lines changed

batcher/batcher.go

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
Copyright 2015 Workiva, LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package batcher
218

319
import (
@@ -15,9 +31,16 @@ type Batcher interface {
1531
// one of the conditions for a "complete" batch is reached.
1632
Get() ([]interface{}, error)
1733

18-
// Dispose will dispose of the batcher. Any calls to Put or Get
19-
// will return errors.
34+
// Flush forcibly completes the batch currently being built
35+
Flush() error
36+
37+
// Dispose will dispose of the batcher. Any calls to Put or Flush
38+
// will return ErrDisposed, calls to Get will return an error iff
39+
// there are no more ready batches.
2040
Dispose()
41+
42+
// IsDisposed will determine if the batcher is disposed
43+
IsDisposed() bool
2144
}
2245

2346
// ErrDisposed is the error returned for a disposed Batcher
@@ -30,7 +53,6 @@ type basicBatcher struct {
3053
maxTime time.Duration
3154
maxItems uint
3255
maxBytes uint
33-
queueLen uint
3456
calculateBytes CalculateBytes
3557
disposed bool
3658
items []interface{}
@@ -55,7 +77,6 @@ func New(maxTime time.Duration, maxItems, maxBytes, queueLen uint, calculate Cal
5577
maxTime: maxTime,
5678
maxItems: maxItems,
5779
maxBytes: maxBytes,
58-
queueLen: queueLen,
5980
calculateBytes: calculate,
6081
items: make([]interface{}, 0, maxItems),
6182
batchChan: make(chan []interface{}, queueLen),
@@ -71,11 +92,11 @@ func (b *basicBatcher) Put(item interface{}) error {
7192
}
7293

7394
b.items = append(b.items, item)
74-
b.availableBytes += b.calculateBytes(item)
95+
if b.calculateBytes != nil {
96+
b.availableBytes += b.calculateBytes(item)
97+
}
7598
if b.ready() {
76-
b.batchChan <- b.items
77-
b.items = make([]interface{}, 0, b.maxItems)
78-
b.availableBytes = 0
99+
b.flush()
79100
}
80101

81102
b.lock.Unlock()
@@ -85,12 +106,8 @@ func (b *basicBatcher) Put(item interface{}) error {
85106
// Get retrieves a batch from the batcher. This call will block until
86107
// one of the conditions for a "complete" batch is reached.
87108
func (b *basicBatcher) Get() ([]interface{}, error) {
88-
b.lock.RLock()
89-
if b.disposed {
90-
b.lock.RUnlock()
91-
return nil, ErrDisposed
92-
}
93-
b.lock.RUnlock()
109+
// Don't check disposed yet so any items remaining in the queue
110+
// will be returned properly.
94111

95112
var timeout <-chan time.Time
96113
if b.maxTime > 0 {
@@ -117,16 +134,50 @@ func (b *basicBatcher) Get() ([]interface{}, error) {
117134
}
118135
}
119136

120-
// Dispose will dispose of the batcher. Any calls to Put or Get
121-
// will return errors.
137+
// Flush forcibly completes the batch currently being built
138+
func (b *basicBatcher) Flush() error {
139+
b.lock.Lock()
140+
if b.disposed {
141+
b.lock.Unlock()
142+
return ErrDisposed
143+
}
144+
b.flush()
145+
b.lock.Unlock()
146+
return nil
147+
}
148+
149+
// Dispose will dispose of the batcher. Any calls to Put or Flush
150+
// will return ErrDisposed, calls to Get will return an error iff
151+
// there are no more ready batches.
122152
func (b *basicBatcher) Dispose() {
123153
b.lock.Lock()
154+
if b.disposed {
155+
b.lock.Unlock()
156+
return
157+
}
158+
b.flush()
124159
b.disposed = true
125160
b.items = nil
126161
close(b.batchChan)
127162
b.lock.Unlock()
128163
}
129164

165+
// IsDisposed will determine if the batcher is disposed
166+
func (b *basicBatcher) IsDisposed() bool {
167+
b.lock.RLock()
168+
disposed := b.disposed
169+
b.lock.RUnlock()
170+
return disposed
171+
}
172+
173+
// flush adds the batch currently being built to the queue of completed batches.
174+
// flush is not threadsafe, so should be synchronized externally.
175+
func (b *basicBatcher) flush() {
176+
b.batchChan <- b.items
177+
b.items = make([]interface{}, 0, b.maxItems)
178+
b.availableBytes = 0
179+
}
180+
130181
func (b *basicBatcher) ready() bool {
131182
if b.maxItems != 0 && uint(len(b.items)) >= b.maxItems {
132183
return true

batcher/batcher_test.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
Copyright 2015 Workiva, LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package batcher
218

319
import (
@@ -75,6 +91,25 @@ func TestMaxTime(t *testing.T) {
7591
assert.Nil(err)
7692
}
7793

94+
func TestFlush(t *testing.T) {
95+
assert := assert.New(t)
96+
b, err := New(0, 10, 10, 10, func(str interface{}) uint {
97+
return uint(len(str.(string)))
98+
})
99+
assert.Nil(err)
100+
b.Put("a")
101+
wait := make(chan bool)
102+
go func() {
103+
batch, err := b.Get()
104+
assert.Equal([]interface{}{"a"}, batch)
105+
assert.Nil(err)
106+
wait <- true
107+
}()
108+
109+
b.Flush()
110+
<-wait
111+
}
112+
78113
func TestMultiConsumer(t *testing.T) {
79114
assert := assert.New(t)
80115
b, err := New(0, 100, 100000, 10, func(str interface{}) uint {
@@ -104,21 +139,41 @@ func TestMultiConsumer(t *testing.T) {
104139

105140
func TestDispose(t *testing.T) {
106141
assert := assert.New(t)
107-
b, err := New(0, 100000, 100000, 10, func(str interface{}) uint {
142+
b, err := New(0, 2, 100000, 10, func(str interface{}) uint {
108143
return uint(len(str.(string)))
109144
})
110145
assert.Nil(err)
111146
b.Put("a")
147+
b.Put("b")
148+
b.Put("c")
112149
wait := make(chan bool)
113150
go func() {
114-
_, err := b.Get()
151+
batch1, err := b.Get()
152+
assert.Equal([]interface{}{"a", "b"}, batch1)
153+
assert.Nil(err)
154+
batch2, err := b.Get()
155+
assert.Equal([]interface{}{"c"}, batch2)
156+
assert.Nil(err)
157+
_, err = b.Get()
115158
assert.Equal(ErrDisposed, err)
116159
wait <- true
117160
}()
118161

119162
b.Dispose()
120163

121-
assert.Equal(ErrDisposed, b.Put("a"))
164+
assert.Equal(ErrDisposed, b.Put("d"))
165+
assert.Equal(ErrDisposed, b.Flush())
122166

123167
<-wait
124168
}
169+
170+
func TestIsDisposed(t *testing.T) {
171+
assert := assert.New(t)
172+
b, err := New(0, 10, 10, 10, func(str interface{}) uint {
173+
return uint(len(str.(string)))
174+
})
175+
assert.Nil(err)
176+
assert.False(b.IsDisposed())
177+
b.Dispose()
178+
assert.True(b.IsDisposed())
179+
}

mock/batcher.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,29 @@
1+
/*
2+
Copyright 2015 Workiva, LLC
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
117
package mock
218

319
import (
420
"github.com/stretchr/testify/mock"
21+
22+
"github.com/Workiva/go-datastructures/batcher"
523
)
624

25+
var _ batcher.Batcher = new(Batcher)
26+
727
type Batcher struct {
828
mock.Mock
929
PutChan chan bool
@@ -22,6 +42,16 @@ func (m *Batcher) Get() ([]interface{}, error) {
2242
return args.Get(0).([]interface{}), args.Error(1)
2343
}
2444

45+
func (m *Batcher) Flush() error {
46+
args := m.Called()
47+
return args.Error(0)
48+
}
49+
2550
func (m *Batcher) Dispose() {
2651
m.Called()
2752
}
53+
54+
func (m *Batcher) IsDisposed() bool {
55+
args := m.Called()
56+
return args.Bool(0)
57+
}

0 commit comments

Comments
 (0)