From df84997cab6ef8dcc0f1e47fb1543b27efde8bd3 Mon Sep 17 00:00:00 2001 From: LYY Date: Fri, 23 Oct 2015 09:55:48 +0800 Subject: [PATCH 01/14] exit worker goroutine on stop --- worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker.go b/worker.go index 27f439e..9043576 100644 --- a/worker.go +++ b/worker.go @@ -39,7 +39,7 @@ func (w *worker) work(messages chan *Msg) { // ready to accept a message case <-w.stop: w.exit <- true - break + return } } } From 4417037696d286d5fe3c411a475123322ab10e3e Mon Sep 17 00:00:00 2001 From: LYY Date: Wed, 28 Oct 2015 10:36:28 +0800 Subject: [PATCH 02/14] fix break "for" "select" case to exit --- fetcher.go | 6 ++++-- manager.go | 3 ++- worker.go | 4 +++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/fetcher.go b/fetcher.go index f6b5cdd..ad2858f 100644 --- a/fetcher.go +++ b/fetcher.go @@ -56,9 +56,10 @@ func (f *fetch) Fetch() { f.processOldMessages() go (func(c chan string) { + Lfetch: for { if f.Closed() { - break + break Lfetch } <-f.Ready() @@ -82,6 +83,7 @@ func (f *fetch) Fetch() { } })(messages) +Lmessage: for { select { case message := <-messages: @@ -89,7 +91,7 @@ func (f *fetch) Fetch() { case <-f.stop: f.closed = true f.exit <- true - break + break Lmessage } } } diff --git a/manager.go b/manager.go index c639cdd..9aded3e 100644 --- a/manager.go +++ b/manager.go @@ -50,13 +50,14 @@ func (m *manager) manage() { go m.fetch.Fetch() +Lmanage: for { select { case message := <-m.confirm: m.fetch.Acknowledge(message) case <-m.stop: m.exit <- true - break + break Lmanage } } } diff --git a/worker.go b/worker.go index 9043576..4c4bf6c 100644 --- a/worker.go +++ b/worker.go @@ -22,6 +22,7 @@ func (w *worker) quit() { } func (w *worker) work(messages chan *Msg) { +Lwork: for { select { case message := <-messages: @@ -39,9 +40,10 @@ func (w *worker) work(messages chan *Msg) { // ready to accept a message case <-w.stop: w.exit <- true - return + break Lwork } } + } func (w *worker) process(message *Msg) (acknowledge bool) { From 60f3ea69de6629152ca96be35c6abf46de05af99 Mon Sep 17 00:00:00 2001 From: LYY Date: Wed, 28 Oct 2015 15:33:40 +0800 Subject: [PATCH 03/14] change break label name --- fetcher.go | 3 +-- manager.go | 4 ++-- worker.go | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/fetcher.go b/fetcher.go index ad2858f..15716ea 100644 --- a/fetcher.go +++ b/fetcher.go @@ -56,10 +56,9 @@ func (f *fetch) Fetch() { f.processOldMessages() go (func(c chan string) { - Lfetch: for { if f.Closed() { - break Lfetch + break } <-f.Ready() diff --git a/manager.go b/manager.go index 9aded3e..2bc2666 100644 --- a/manager.go +++ b/manager.go @@ -50,14 +50,14 @@ func (m *manager) manage() { go m.fetch.Fetch() -Lmanage: +Lmanager: for { select { case message := <-m.confirm: m.fetch.Acknowledge(message) case <-m.stop: m.exit <- true - break Lmanage + break Lmanager } } } diff --git a/worker.go b/worker.go index 4c4bf6c..213e5b6 100644 --- a/worker.go +++ b/worker.go @@ -22,7 +22,7 @@ func (w *worker) quit() { } func (w *worker) work(messages chan *Msg) { -Lwork: +Lworker: for { select { case message := <-messages: @@ -40,7 +40,7 @@ Lwork: // ready to accept a message case <-w.stop: w.exit <- true - break Lwork + break Lworker } } From e3bf0e29d6d9a49f52265dcfbcb5c38dec9a9786 Mon Sep 17 00:00:00 2001 From: LYY Date: Wed, 28 Oct 2015 15:37:02 +0800 Subject: [PATCH 04/14] change break label name --- manager.go | 4 ++-- worker.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/manager.go b/manager.go index 2bc2666..9aded3e 100644 --- a/manager.go +++ b/manager.go @@ -50,14 +50,14 @@ func (m *manager) manage() { go m.fetch.Fetch() -Lmanager: +Lmanage: for { select { case message := <-m.confirm: m.fetch.Acknowledge(message) case <-m.stop: m.exit <- true - break Lmanager + break Lmanage } } } diff --git a/worker.go b/worker.go index 213e5b6..4c4bf6c 100644 --- a/worker.go +++ b/worker.go @@ -22,7 +22,7 @@ func (w *worker) quit() { } func (w *worker) work(messages chan *Msg) { -Lworker: +Lwork: for { select { case message := <-messages: @@ -40,7 +40,7 @@ Lworker: // ready to accept a message case <-w.stop: w.exit <- true - break Lworker + break Lwork } } From 0181299b2be9e862a9ab12be394835754bf720fb Mon Sep 17 00:00:00 2001 From: LYY Date: Tue, 3 Nov 2015 10:15:46 +0800 Subject: [PATCH 05/14] quit test --- manager.go | 2 ++ workers.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/manager.go b/manager.go index 9aded3e..6db83cc 100644 --- a/manager.go +++ b/manager.go @@ -37,9 +37,11 @@ func (m *manager) quit() { worker.quit() } + Logger.Println("quitting queue", m.queueName(), " stoped all workers.") m.stop <- true <-m.exit + Logger.Println("quitting queue", m.queueName(), " before down.") m.Done() } diff --git a/workers.go b/workers.go index 77a1bc6..fb25503 100644 --- a/workers.go +++ b/workers.go @@ -42,7 +42,7 @@ func Start() { func Quit() { quitManagers() schedule.quit() - waitForExit() + // waitForExit() } func StatsServer(port int) { From 1cc4ae8a0b97b2088d268f2e9d5753a1d9078a47 Mon Sep 17 00:00:00 2001 From: LYY Date: Mon, 9 Nov 2015 12:23:03 +0800 Subject: [PATCH 06/14] remove test log --- manager.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/manager.go b/manager.go index 6db83cc..9aded3e 100644 --- a/manager.go +++ b/manager.go @@ -37,11 +37,9 @@ func (m *manager) quit() { worker.quit() } - Logger.Println("quitting queue", m.queueName(), " stoped all workers.") m.stop <- true <-m.exit - Logger.Println("quitting queue", m.queueName(), " before down.") m.Done() } From 619983fd90c95ddea651b6ef09a516a3b568af27 Mon Sep 17 00:00:00 2001 From: LYY Date: Mon, 9 Nov 2015 12:24:28 +0800 Subject: [PATCH 07/14] fix quit --- workers.go | 1 - 1 file changed, 1 deletion(-) diff --git a/workers.go b/workers.go index fb25503..570ae43 100644 --- a/workers.go +++ b/workers.go @@ -42,7 +42,6 @@ func Start() { func Quit() { quitManagers() schedule.quit() - // waitForExit() } func StatsServer(port int) { From f747a9ed523bdea6aeb447515471e8e886b82bd7 Mon Sep 17 00:00:00 2001 From: LYY Date: Thu, 24 Dec 2015 15:14:14 +0800 Subject: [PATCH 08/14] fix data race --- fetcher.go | 65 ++++++++++++++++++++++++++++++---------------------- manager.go | 8 +++++++ scheduled.go | 11 +++++---- workers.go | 4 ++-- 4 files changed, 55 insertions(+), 33 deletions(-) diff --git a/fetcher.go b/fetcher.go index 15716ea..88ae897 100644 --- a/fetcher.go +++ b/fetcher.go @@ -23,7 +23,7 @@ type fetch struct { messages chan *Msg stop chan bool exit chan bool - closed bool + closed chan bool } func NewFetch(queue string, messages chan *Msg, ready chan bool) Fetcher { @@ -33,7 +33,7 @@ func NewFetch(queue string, messages chan *Msg, ready chan bool) Fetcher { messages, make(chan bool), make(chan bool), - false, + make(chan bool), } } @@ -55,46 +55,52 @@ func (f *fetch) Fetch() { f.processOldMessages() - go (func(c chan string) { + go func(c chan string) { for { + // f.Close() has been called if f.Closed() { break } - <-f.Ready() - - (func() { - conn := Config.Pool.Get() - defer conn.Close() - - message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1)) - - if err != nil { - // If redis returns null, the queue is empty. Just ignore the error. - if err.Error() != "redigo: nil returned" { - Logger.Println("ERR: ", err) - time.Sleep(1 * time.Second) - } - } else { - c <- message - } - })() + f.tryFetchMessage(c) } - })(messages) + }(messages) + + f.handleMessages(messages) +} -Lmessage: +func (f *fetch) handleMessages(messages chan string) { for { select { case message := <-messages: f.sendMessage(message) case <-f.stop: - f.closed = true - f.exit <- true - break Lmessage + // Stop the redis-polling goroutine + close(f.closed) + // Signal to Close() that the fetcher has stopped + close(f.exit) + return } } } +func (f *fetch) tryFetchMessage(messages chan string) { + conn := Config.Pool.Get() + defer conn.Close() + + message, err := redis.String(conn.Do("brpoplpush", f.queue, f.inprogressQueue(), 1)) + + if err != nil { + // If redis returns null, the queue is empty. Just ignore the error. + if err.Error() != "redigo: nil returned" { + Logger.Println("ERR: ", err) + time.Sleep(1 * time.Second) + } + } else { + messages <- message + } +} + func (f *fetch) sendMessage(message string) { msg, err := NewMsg(message) @@ -126,7 +132,12 @@ func (f *fetch) Close() { } func (f *fetch) Closed() bool { - return f.closed + select { + case <-f.closed: + return true + default: + return false + } } func (f *fetch) inprogressMessages() []string { diff --git a/manager.go b/manager.go index 9aded3e..1c06c4c 100644 --- a/manager.go +++ b/manager.go @@ -11,6 +11,7 @@ type manager struct { job jobFunc concurrency int workers []*worker + workersM *sync.Mutex confirm chan *Msg stop chan bool exit chan bool @@ -33,9 +34,11 @@ func (m *manager) quit() { Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).") m.prepare() + m.workersM.Lock() for _, worker := range m.workers { worker.quit() } + m.workersM.Unlock() m.stop <- true <-m.exit @@ -63,18 +66,22 @@ Lmanage: } func (m *manager) loadWorkers() { + m.workersM.Lock() for i := 0; i < m.concurrency; i++ { m.workers[i] = newWorker(m) m.workers[i].start() } + m.workersM.Unlock() } func (m *manager) processing() (count int) { + m.workersM.Lock() for _, worker := range m.workers { if worker.processing() { count++ } } + m.workersM.Unlock() return } @@ -99,6 +106,7 @@ func newManager(queue string, job jobFunc, concurrency int, mids ...Action) *man job, concurrency, make([]*worker, concurrency), + &sync.Mutex{}, make(chan *Msg), make(chan bool), make(chan bool), diff --git a/scheduled.go b/scheduled.go index 53ea5aa..cc6953c 100644 --- a/scheduled.go +++ b/scheduled.go @@ -9,15 +9,18 @@ import ( type scheduled struct { keys []string - closed bool + closed chan bool exit chan bool } func (s *scheduled) start() { go (func() { for { - if s.closed { + + select { + case <-s.closed: return + default: } s.poll() @@ -28,7 +31,7 @@ func (s *scheduled) start() { } func (s *scheduled) quit() { - s.closed = true + close(s.closed) } func (s *scheduled) poll() { @@ -60,5 +63,5 @@ func (s *scheduled) poll() { } func newScheduled(keys ...string) *scheduled { - return &scheduled{keys, false, make(chan bool)} + return &scheduled{keys, make(chan bool), make(chan bool)} } diff --git a/workers.go b/workers.go index 570ae43..0a4d341 100644 --- a/workers.go +++ b/workers.go @@ -42,6 +42,7 @@ func Start() { func Quit() { quitManagers() schedule.quit() + waitForExit() } func StatsServer(port int) { @@ -67,8 +68,7 @@ func quitManagers() { } func waitForExit() { - for queue, manager := range managers { + for _, manager := range managers { manager.Wait() - delete(managers, queue) } } From 69df03fbb3a7eb067ad871cbffd84f54a84e6f4b Mon Sep 17 00:00:00 2001 From: LYY Date: Thu, 24 Dec 2015 17:17:32 +0800 Subject: [PATCH 09/14] add go1.5 travis --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index ed720f0..ad390c9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ language: go go: - 1.1 - 1.4 + - 1.5 script: - go get github.com/customerio/gospec From a7cafdf1e21931fb025208b6571bdaf7845325d0 Mon Sep 17 00:00:00 2001 From: LYY Date: Fri, 25 Dec 2015 14:15:27 +0800 Subject: [PATCH 10/14] replace to own --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 9caf153..33d362c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -[![Build Status](https://travis-ci.org/jrallison/go-workers.png)](https://travis-ci.org/jrallison/go-workers) -[![GoDoc](https://godoc.org/github.com/jrallison/go-workers?status.png)](https://godoc.org/github.com/jrallison/go-workers) +[![Build Status](https://travis-ci.org/LYY/go-workers.png)](https://travis-ci.org/LYY/go-workers) +[![GoDoc](https://godoc.org/github.com/LYY/go-workers?status.png)](https://godoc.org/github.com/LYY/go-workers) [Sidekiq](http://sidekiq.org/) compatible background workers in [golang](http://golang.org/). @@ -18,7 +18,7 @@ Example usage: package main import ( - "github.com/jrallison/go-workers" + "github.com/LYY/go-workers" ) func myJob(message *workers.Msg) { From efa77fe5571039190a27e012b7ec5908f2c29af3 Mon Sep 17 00:00:00 2001 From: LYY Date: Fri, 25 Dec 2015 17:29:44 +0800 Subject: [PATCH 11/14] travis ci --- .travis.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index ad390c9..37e6818 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,8 +5,10 @@ go: - 1.4 - 1.5 -script: +install: - go get github.com/customerio/gospec + +script: - go test -v services: From 10229312750151ddb9ef64150d53e9f631c5c457 Mon Sep 17 00:00:00 2001 From: LYY Date: Fri, 25 Dec 2015 17:32:06 +0800 Subject: [PATCH 12/14] travis ci --- .travis.yml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 37e6818..ad390c9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,10 +5,8 @@ go: - 1.4 - 1.5 -install: - - go get github.com/customerio/gospec - script: + - go get github.com/customerio/gospec - go test -v services: From 7151f9c1b9c91af96f6912d7d3e775cc2a549bbb Mon Sep 17 00:00:00 2001 From: LYY Date: Fri, 25 Dec 2015 17:35:40 +0800 Subject: [PATCH 13/14] travis ci status --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 33d362c..9c39bc4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Build Status](https://travis-ci.org/LYY/go-workers.png)](https://travis-ci.org/LYY/go-workers) +[![Build Status](https://travis-ci.org/LYY/go-workers.svg?branch=master)](https://travis-ci.org/LYY/go-workers) [![GoDoc](https://godoc.org/github.com/LYY/go-workers?status.png)](https://godoc.org/github.com/LYY/go-workers) [Sidekiq](http://sidekiq.org/) compatible From 41258eaa448a2b96542231b6341eeeaab4c03116 Mon Sep 17 00:00:00 2001 From: LYY Date: Mon, 22 Feb 2016 11:28:03 +0800 Subject: [PATCH 14/14] restore readme to jrallision --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 9c39bc4..dc69946 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -[![Build Status](https://travis-ci.org/LYY/go-workers.svg?branch=master)](https://travis-ci.org/LYY/go-workers) -[![GoDoc](https://godoc.org/github.com/LYY/go-workers?status.png)](https://godoc.org/github.com/LYY/go-workers) +[![Build Status](https://travis-ci.org/jrallison/go-workers.png)](https://travis-ci.org/jrallison/go-workers) +[![GoDoc](https://godoc.org/github.com/jrallison/go-workers?status.png)](https://godoc.org/github.com/jrallison/go-workers) [Sidekiq](http://sidekiq.org/) compatible background workers in [golang](http://golang.org/). @@ -18,7 +18,7 @@ Example usage: package main import ( - "github.com/LYY/go-workers" + "github.com/jrallison/go-workers" ) func myJob(message *workers.Msg) {