diff --git a/.travis.yml b/.travis.yml index ed720f0..ad390c9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ language: go go: - 1.1 - 1.4 + - 1.5 script: - go get github.com/customerio/gospec diff --git a/fetcher.go b/fetcher.go index f6b5cdd..88ae897 100644 --- a/fetcher.go +++ b/fetcher.go @@ -23,7 +23,7 @@ type fetch struct { messages chan *Msg stop chan bool exit chan bool - closed bool + closed chan bool } func NewFetch(queue string, messages chan *Msg, ready chan bool) Fetcher { @@ -33,7 +33,7 @@ func NewFetch(queue string, messages chan *Msg, ready chan bool) Fetcher { messages, make(chan bool), make(chan bool), - false, + make(chan bool), } } @@ -55,45 +55,52 @@ func (f *fetch) Fetch() { f.processOldMessages() - go (func(c chan string) { + go func(c chan string) { for { + // f.Close() has been called if f.Closed() { break } - <-f.Ready() - - (func() { - conn := Config.Pool.Get() - defer conn.Close() - - message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1)) - - if err != nil { - // If redis returns null, the queue is empty. Just ignore the error. - if err.Error() != "redigo: nil returned" { - Logger.Println("ERR: ", err) - time.Sleep(1 * time.Second) - } - } else { - c <- message - } - })() + f.tryFetchMessage(c) } - })(messages) + }(messages) + + f.handleMessages(messages) +} +func (f *fetch) handleMessages(messages chan string) { for { select { case message := <-messages: f.sendMessage(message) case <-f.stop: - f.closed = true - f.exit <- true - break + // Stop the redis-polling goroutine + close(f.closed) + // Signal to Close() that the fetcher has stopped + close(f.exit) + return } } } +func (f *fetch) tryFetchMessage(messages chan string) { + conn := Config.Pool.Get() + defer conn.Close() + + message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1)) + + if err != nil { + // If redis returns null, the queue is empty. Just ignore the error. + if err.Error() != "redigo: nil returned" { + Logger.Println("ERR: ", err) + time.Sleep(1 * time.Second) + } + } else { + messages <- message + } +} + func (f *fetch) sendMessage(message string) { msg, err := NewMsg(message) @@ -125,7 +132,12 @@ func (f *fetch) Close() { } func (f *fetch) Closed() bool { - return f.closed + select { + case <-f.closed: + return true + default: + return false + } } func (f *fetch) inprogressMessages() []string { diff --git a/manager.go b/manager.go index c639cdd..1c06c4c 100644 --- a/manager.go +++ b/manager.go @@ -11,6 +11,7 @@ type manager struct { job jobFunc concurrency int workers []*worker + workersM *sync.Mutex confirm chan *Msg stop chan bool exit chan bool @@ -33,9 +34,11 @@ func (m *manager) quit() { Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).") m.prepare() + m.workersM.Lock() for _, worker := range m.workers { worker.quit() } + m.workersM.Unlock() m.stop <- true <-m.exit @@ -50,30 +53,35 @@ func (m *manager) manage() { go m.fetch.Fetch() +Lmanage: for { select { case message := <-m.confirm: m.fetch.Acknowledge(message) case <-m.stop: m.exit <- true - break + break Lmanage } } } func (m *manager) loadWorkers() { + m.workersM.Lock() for i := 0; i < m.concurrency; i++ { m.workers[i] = newWorker(m) m.workers[i].start() } + m.workersM.Unlock() } func (m *manager) processing() (count int) { + m.workersM.Lock() for _, worker := range m.workers { if worker.processing() { count++ } } + m.workersM.Unlock() return } @@ -98,6 +106,7 @@ func newManager(queue string, job jobFunc, concurrency int, mids ...Action) *man job, concurrency, make([]*worker, concurrency), + &sync.Mutex{}, make(chan *Msg), make(chan bool), make(chan bool), diff --git a/scheduled.go b/scheduled.go index 53ea5aa..cc6953c 100644 --- a/scheduled.go +++ b/scheduled.go @@ -9,15 +9,18 @@ import ( type scheduled struct { keys []string - closed bool + closed chan bool exit chan bool } func (s *scheduled) start() { go (func() { for { - if s.closed { + + select { + case <-s.closed: return + default: } s.poll() @@ -28,7 +31,7 @@ func (s *scheduled) start() { } func (s *scheduled) quit() { - s.closed = true + close(s.closed) } func (s *scheduled) poll() { @@ -60,5 +63,5 @@ func (s *scheduled) poll() { } func newScheduled(keys ...string) *scheduled { - return &scheduled{keys, false, make(chan bool)} + return &scheduled{keys, make(chan bool), make(chan bool)} } diff --git a/worker.go b/worker.go index 27f439e..4c4bf6c 100644 --- a/worker.go +++ b/worker.go @@ -22,6 +22,7 @@ func (w *worker) quit() { } func (w *worker) work(messages chan *Msg) { +Lwork: for { select { case message := <-messages: @@ -39,9 +40,10 @@ func (w *worker) work(messages chan *Msg) { // ready to accept a message case <-w.stop: w.exit <- true - break + break Lwork } } + } func (w *worker) process(message *Msg) (acknowledge bool) { diff --git a/workers.go b/workers.go index 77a1bc6..0a4d341 100644 --- a/workers.go +++ b/workers.go @@ -68,8 +68,7 @@ func quitManagers() { } func waitForExit() { - for queue, manager := range managers { + for _, manager := range managers { manager.Wait() - delete(managers, queue) } }