Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ language: go
go:
- 1.1
- 1.4
- 1.5

script:
- go get github.com/customerio/gospec
Expand Down
64 changes: 38 additions & 26 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type fetch struct {
messages chan *Msg
stop chan bool
exit chan bool
closed bool
closed chan bool
}

func NewFetch(queue string, messages chan *Msg, ready chan bool) Fetcher {
Expand All @@ -33,7 +33,7 @@ func NewFetch(queue string, messages chan *Msg, ready chan bool) Fetcher {
messages,
make(chan bool),
make(chan bool),
false,
make(chan bool),
}
}

Expand All @@ -55,45 +55,52 @@ func (f *fetch) Fetch() {

f.processOldMessages()

go (func(c chan string) {
go func(c chan string) {
for {
// f.Close() has been called
if f.Closed() {
break
}

<-f.Ready()

(func() {
conn := Config.Pool.Get()
defer conn.Close()

message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1))

if err != nil {
// If redis returns null, the queue is empty. Just ignore the error.
if err.Error() != "redigo: nil returned" {
Logger.Println("ERR: ", err)
time.Sleep(1 * time.Second)
}
} else {
c <- message
}
})()
f.tryFetchMessage(c)
}
})(messages)
}(messages)

f.handleMessages(messages)
}

func (f *fetch) handleMessages(messages chan string) {
for {
select {
case message := <-messages:
f.sendMessage(message)
case <-f.stop:
f.closed = true
f.exit <- true
break
// Stop the redis-polling goroutine
close(f.closed)
// Signal to Close() that the fetcher has stopped
close(f.exit)
return
}
}
}

func (f *fetch) tryFetchMessage(messages chan string) {
conn := Config.Pool.Get()
defer conn.Close()

message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1))

if err != nil {
// If redis returns null, the queue is empty. Just ignore the error.
if err.Error() != "redigo: nil returned" {
Logger.Println("ERR: ", err)
time.Sleep(1 * time.Second)
}
} else {
messages <- message
}
}

func (f *fetch) sendMessage(message string) {
msg, err := NewMsg(message)

Expand Down Expand Up @@ -125,7 +132,12 @@ func (f *fetch) Close() {
}

func (f *fetch) Closed() bool {
return f.closed
select {
case <-f.closed:
return true
default:
return false
}
}

func (f *fetch) inprogressMessages() []string {
Expand Down
11 changes: 10 additions & 1 deletion manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type manager struct {
job jobFunc
concurrency int
workers []*worker
workersM *sync.Mutex
confirm chan *Msg
stop chan bool
exit chan bool
Expand All @@ -33,9 +34,11 @@ func (m *manager) quit() {
Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).")
m.prepare()

m.workersM.Lock()
for _, worker := range m.workers {
worker.quit()
}
m.workersM.Unlock()

m.stop <- true
<-m.exit
Expand All @@ -50,30 +53,35 @@ func (m *manager) manage() {

go m.fetch.Fetch()

Lmanage:
for {
select {
case message := <-m.confirm:
m.fetch.Acknowledge(message)
case <-m.stop:
m.exit <- true
break
break Lmanage
}
}
}

func (m *manager) loadWorkers() {
m.workersM.Lock()
for i := 0; i < m.concurrency; i++ {
m.workers[i] = newWorker(m)
m.workers[i].start()
}
m.workersM.Unlock()
}

func (m *manager) processing() (count int) {
m.workersM.Lock()
for _, worker := range m.workers {
if worker.processing() {
count++
}
}
m.workersM.Unlock()

return
}
Expand All @@ -98,6 +106,7 @@ func newManager(queue string, job jobFunc, concurrency int, mids ...Action) *man
job,
concurrency,
make([]*worker, concurrency),
&sync.Mutex{},
make(chan *Msg),
make(chan bool),
make(chan bool),
Expand Down
11 changes: 7 additions & 4 deletions scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ import (

type scheduled struct {
keys []string
closed bool
closed chan bool
exit chan bool
}

func (s *scheduled) start() {
go (func() {
for {
if s.closed {

select {
case <-s.closed:
return
default:
}

s.poll()
Expand All @@ -28,7 +31,7 @@ func (s *scheduled) start() {
}

func (s *scheduled) quit() {
s.closed = true
close(s.closed)
}

func (s *scheduled) poll() {
Expand Down Expand Up @@ -60,5 +63,5 @@ func (s *scheduled) poll() {
}

func newScheduled(keys ...string) *scheduled {
return &scheduled{keys, false, make(chan bool)}
return &scheduled{keys, make(chan bool), make(chan bool)}
}
4 changes: 3 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (w *worker) quit() {
}

func (w *worker) work(messages chan *Msg) {
Lwork:
for {
select {
case message := <-messages:
Expand All @@ -39,9 +40,10 @@ func (w *worker) work(messages chan *Msg) {
// ready to accept a message
case <-w.stop:
w.exit <- true
break
break Lwork
}
}

}

func (w *worker) process(message *Msg) (acknowledge bool) {
Expand Down
3 changes: 1 addition & 2 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ func quitManagers() {
}

func waitForExit() {
for queue, manager := range managers {
for _, manager := range managers {
manager.Wait()
delete(managers, queue)
}
}