Skip to content

Commit b07a1f1

Browse files
Merge pull request #91 from tylertreat/queue_poll
Queue poll
2 parents 4d50edb + 2fe8e0d commit b07a1f1

File tree

6 files changed

+121
-33
lines changed

6 files changed

+121
-33
lines changed

queue/error.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,11 @@ package queue
1818

1919
import "errors"
2020

21-
var disposedError = errors.New(`Queue has been disposed.`)
21+
var (
22+
// ErrDisposed is returned when an operation is performed on a disposed
23+
// queue.
24+
ErrDisposed = errors.New(`queue: disposed`)
25+
26+
// ErrTimeout is returned when an applicable queue operation times out.
27+
ErrTimeout = errors.New(`queue: poll timed out`)
28+
)

queue/priority_queue.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (pq *PriorityQueue) Put(items ...Item) error {
108108
pq.lock.Lock()
109109
if pq.disposed {
110110
pq.lock.Unlock()
111-
return disposedError
111+
return ErrDisposed
112112
}
113113

114114
for _, item := range items {
@@ -122,7 +122,7 @@ func (pq *PriorityQueue) Put(items ...Item) error {
122122
}
123123

124124
sema.response.Add(1)
125-
sema.wg.Done()
125+
sema.ready <- true
126126
sema.response.Wait()
127127
if len(pq.items) == 0 {
128128
break
@@ -145,22 +145,21 @@ func (pq *PriorityQueue) Get(number int) ([]Item, error) {
145145

146146
if pq.disposed {
147147
pq.lock.Unlock()
148-
return nil, disposedError
148+
return nil, ErrDisposed
149149
}
150150

151151
var items []Item
152152

153153
if len(pq.items) == 0 {
154154
sema := newSema()
155155
pq.waiters.put(sema)
156-
sema.wg.Add(1)
157156
pq.lock.Unlock()
158157

159-
sema.wg.Wait()
158+
<-sema.ready
160159
pq.disposeLock.Lock()
161160
if pq.disposed {
162161
pq.disposeLock.Unlock()
163-
return nil, disposedError
162+
return nil, ErrDisposed
164163
}
165164
pq.disposeLock.Unlock()
166165

@@ -221,7 +220,7 @@ func (pq *PriorityQueue) Dispose() {
221220
pq.disposed = true
222221
for _, waiter := range pq.waiters {
223222
waiter.response.Add(1)
224-
waiter.wg.Done()
223+
waiter.ready <- true
225224
}
226225

227226
pq.items = nil

queue/priority_queue_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,18 +193,18 @@ func TestEmptyPriorityGetWithDispose(t *testing.T) {
193193

194194
wg.Wait()
195195

196-
assert.IsType(t, disposedError, err)
196+
assert.IsType(t, ErrDisposed, err)
197197
}
198198

199199
func TestPriorityGetPutDisposed(t *testing.T) {
200200
q := NewPriorityQueue(1)
201201
q.Dispose()
202202

203203
_, err := q.Get(1)
204-
assert.IsType(t, disposedError, err)
204+
assert.IsType(t, ErrDisposed, err)
205205

206206
err = q.Put(mockItem(1))
207-
assert.IsType(t, disposedError, err)
207+
assert.IsType(t, ErrDisposed, err)
208208
}
209209

210210
func BenchmarkPriorityQueue(b *testing.B) {

queue/queue.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import (
5555
"runtime"
5656
"sync"
5757
"sync/atomic"
58+
"time"
5859
)
5960

6061
type waiters []*sema
@@ -119,13 +120,13 @@ func (items *items) getUntil(checker func(item interface{}) bool) []interface{}
119120
}
120121

121122
type sema struct {
122-
wg *sync.WaitGroup
123+
ready chan bool
123124
response *sync.WaitGroup
124125
}
125126

126127
func newSema() *sema {
127128
return &sema{
128-
wg: &sync.WaitGroup{},
129+
ready: make(chan bool, 1),
129130
response: &sync.WaitGroup{},
130131
}
131132
}
@@ -149,7 +150,7 @@ func (q *Queue) Put(items ...interface{}) error {
149150

150151
if q.disposed {
151152
q.lock.Unlock()
152-
return disposedError
153+
return ErrDisposed
153154
}
154155

155156
q.items = append(q.items, items...)
@@ -159,8 +160,12 @@ func (q *Queue) Put(items ...interface{}) error {
159160
break
160161
}
161162
sema.response.Add(1)
162-
sema.wg.Done()
163-
sema.response.Wait()
163+
select {
164+
case sema.ready <- true:
165+
sema.response.Wait()
166+
default:
167+
// This semaphore timed out.
168+
}
164169
if len(q.items) == 0 {
165170
break
166171
}
@@ -175,6 +180,15 @@ func (q *Queue) Put(items ...interface{}) error {
175180
// parameter. If no items are in the queue, this method will pause
176181
// until items are added to the queue.
177182
func (q *Queue) Get(number int64) ([]interface{}, error) {
183+
return q.Poll(number, 0)
184+
}
185+
186+
// Poll retrieves items from the queue. If there are some items in the queue,
187+
// Poll will return a number UP TO the number passed in as a parameter. If no
188+
// items are in the queue, this method will pause until items are added to the
189+
// queue or the provided timeout is reached. A non-positive timeout will block
190+
// until items are added. If a timeout occurs, ErrTimeout is returned.
191+
func (q *Queue) Poll(number int64, timeout time.Duration) ([]interface{}, error) {
178192
if number < 1 {
179193
// thanks again go
180194
return []interface{}{}, nil
@@ -184,25 +198,32 @@ func (q *Queue) Get(number int64) ([]interface{}, error) {
184198

185199
if q.disposed {
186200
q.lock.Unlock()
187-
return nil, disposedError
201+
return nil, ErrDisposed
188202
}
189203

190204
var items []interface{}
191205

192206
if len(q.items) == 0 {
193207
sema := newSema()
194208
q.waiters.put(sema)
195-
sema.wg.Add(1)
196209
q.lock.Unlock()
197210

198-
sema.wg.Wait()
199-
// we are now inside the put's lock
200-
if q.disposed {
201-
return nil, disposedError
211+
var timeoutC <-chan time.Time
212+
if timeout > 0 {
213+
timeoutC = time.After(timeout)
214+
}
215+
select {
216+
case <-sema.ready:
217+
// we are now inside the put's lock
218+
if q.disposed {
219+
return nil, ErrDisposed
220+
}
221+
items = q.items.get(number)
222+
sema.response.Done()
223+
return items, nil
224+
case <-timeoutC:
225+
return nil, ErrTimeout
202226
}
203-
items = q.items.get(number)
204-
sema.response.Done()
205-
return items, nil
206227
}
207228

208229
items = q.items.get(number)
@@ -222,7 +243,7 @@ func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, e
222243

223244
if q.disposed {
224245
q.lock.Unlock()
225-
return nil, disposedError
246+
return nil, ErrDisposed
226247
}
227248

228249
result := q.items.getUntil(checker)
@@ -264,7 +285,7 @@ func (q *Queue) Dispose() {
264285
q.disposed = true
265286
for _, waiter := range q.waiters {
266287
waiter.response.Add(1)
267-
waiter.wg.Done()
288+
waiter.ready <- true
268289
}
269290

270291
q.items = nil

queue/queue_test.go

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"sync"
2121
"sync/atomic"
2222
"testing"
23+
"time"
2324

2425
"github.com/stretchr/testify/assert"
2526
)
@@ -81,6 +82,44 @@ func TestGet(t *testing.T) {
8182
assert.Equal(t, `2`, result[0])
8283
}
8384

85+
func TestPoll(t *testing.T) {
86+
q := New(10)
87+
88+
q.Put(`test`)
89+
result, err := q.Poll(2, 0)
90+
if !assert.Nil(t, err) {
91+
return
92+
}
93+
94+
assert.Len(t, result, 1)
95+
assert.Equal(t, `test`, result[0])
96+
assert.Equal(t, int64(0), q.Len())
97+
98+
q.Put(`1`)
99+
q.Put(`2`)
100+
101+
result, err = q.Poll(1, time.Millisecond)
102+
if !assert.Nil(t, err) {
103+
return
104+
}
105+
106+
assert.Len(t, result, 1)
107+
assert.Equal(t, `1`, result[0])
108+
assert.Equal(t, int64(1), q.Len())
109+
110+
result, err = q.Poll(2, time.Millisecond)
111+
if !assert.Nil(t, err) {
112+
return
113+
}
114+
115+
assert.Equal(t, `2`, result[0])
116+
117+
before := time.Now()
118+
_, err = q.Poll(1, 5*time.Millisecond)
119+
assert.InDelta(t, 5, time.Since(before).Seconds()*1000, 2)
120+
assert.Equal(t, ErrTimeout, err)
121+
}
122+
84123
func TestAddEmptyPut(t *testing.T) {
85124
q := New(10)
86125

@@ -189,7 +228,7 @@ func TestEmptyGetWithDispose(t *testing.T) {
189228

190229
wg.Wait()
191230

192-
assert.IsType(t, disposedError, err)
231+
assert.IsType(t, ErrDisposed, err)
193232
}
194233

195234
func TestGetPutDisposed(t *testing.T) {
@@ -198,10 +237,10 @@ func TestGetPutDisposed(t *testing.T) {
198237
q.Dispose()
199238

200239
_, err := q.Get(1)
201-
assert.IsType(t, disposedError, err)
240+
assert.IsType(t, ErrDisposed, err)
202241

203242
err = q.Put(`a`)
204-
assert.IsType(t, disposedError, err)
243+
assert.IsType(t, ErrDisposed, err)
205244
}
206245

207246
func BenchmarkQueue(b *testing.B) {
@@ -289,7 +328,7 @@ func TestTakeUntilOnDisposedQueue(t *testing.T) {
289328
})
290329

291330
assert.Nil(t, result)
292-
assert.IsType(t, disposedError, err)
331+
assert.IsType(t, ErrDisposed, err)
293332
}
294333

295334
func TestExecuteInParallel(t *testing.T) {
@@ -360,6 +399,28 @@ func BenchmarkQueueGet(b *testing.B) {
360399
}
361400
}
362401

402+
func BenchmarkQueuePoll(b *testing.B) {
403+
numItems := int64(1000)
404+
405+
qs := make([]*Queue, 0, b.N)
406+
407+
for i := 0; i < b.N; i++ {
408+
q := New(numItems)
409+
for j := int64(0); j < numItems; j++ {
410+
q.Put(j)
411+
}
412+
qs = append(qs, q)
413+
}
414+
415+
b.ResetTimer()
416+
417+
for _, q := range qs {
418+
for j := int64(0); j < numItems; j++ {
419+
q.Poll(1, time.Millisecond)
420+
}
421+
}
422+
}
423+
363424
func BenchmarkExecuteInParallel(b *testing.B) {
364425
numItems := int64(1000)
365426

queue/ring.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (rb *RingBuffer) Put(item interface{}) error {
7878
L:
7979
for {
8080
if atomic.LoadUint64(&rb.disposed) == 1 {
81-
return disposedError
81+
return ErrDisposed
8282
}
8383

8484
n = rb.nodes[pos&rb.mask]
@@ -118,7 +118,7 @@ func (rb *RingBuffer) Get() (interface{}, error) {
118118
L:
119119
for {
120120
if atomic.LoadUint64(&rb.disposed) == 1 {
121-
return nil, disposedError
121+
return nil, ErrDisposed
122122
}
123123

124124
n = rb.nodes[pos&rb.mask]

0 commit comments

Comments
 (0)