From 76e36123c4a1d48e4f33d9254928d06b911388f5 Mon Sep 17 00:00:00 2001 From: cscatolini Date: Sun, 29 Oct 2017 10:10:27 -0200 Subject: [PATCH 01/11] Add dep. --- .gitignore | 1 + Gopkg.lock | 27 +++++++++++++++++++++++++++ Gopkg.toml | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+) create mode 100644 .gitignore create mode 100644 Gopkg.lock create mode 100644 Gopkg.toml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5657f6e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +vendor \ No newline at end of file diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 0000000..e07ef50 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,27 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + name = "github.com/bitly/go-simplejson" + packages = ["."] + revision = "aabad6e819789e569bd6aabf444c935aa9ba1e44" + version = "v0.5.0" + +[[projects]] + branch = "master" + name = "github.com/customerio/gospec" + packages = ["."] + revision = "a5cc0e48aa393147b3fc264d3b9582b69ea2733d" + +[[projects]] + name = "github.com/garyburd/redigo" + packages = ["internal","redis"] + revision = "34a326de1fea52965fa5ad664d3fc7163dd4b0a1" + version = "v1.2.0" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + inputs-digest = "d05967babd4b751e99afcefd6ce85ffd6299ad787cc90986f38aeff21e5172dc" + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 0000000..466d307 --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,34 @@ + +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" + + +[[constraint]] + name = "github.com/bitly/go-simplejson" + version = "0.5.0" + +[[constraint]] + branch = "master" + name = "github.com/customerio/gospec" + +[[constraint]] + name = "github.com/garyburd/redigo" + version = "1.2.0" From 220b32682cf58cddde07dd2a30361b21f9d0bff0 Mon Sep 17 00:00:00 2001 From: cscatolini Date: Sun, 29 Oct 2017 12:20:57 -0200 Subject: [PATCH 02/11] Add retry options for parametrizing retry exponential backoff function. --- enqueue.go | 14 +++++-- enqueue_test.go | 27 ++++++++++++ middleware_retry.go | 42 ++++++++++++++++--- middleware_retry_test.go | 88 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 162 insertions(+), 9 deletions(-) diff --git a/enqueue.go b/enqueue.go index f9af7a8..1fde003 100644 --- a/enqueue.go +++ b/enqueue.go @@ -22,9 +22,17 @@ type EnqueueData struct { } type EnqueueOptions struct { - RetryCount int `json:"retry_count,omitempty"` - Retry bool `json:"retry,omitempty"` - At float64 `json:"at,omitempty"` + RetryCount int `json:"retry_count,omitempty"` + Retry bool `json:"retry,omitempty"` + At float64 `json:"at,omitempty"` + RetryOptions RetryOptions `json:"retry_options,omitempty"` +} + +type RetryOptions struct { + Exp int `json:"exp"` + MinDelay int `json:"min_delay"` + MaxDelay int `json:"max_delay"` + MaxRand int `json:"max_rand"` } func generateJid() string { diff --git a/enqueue_test.go b/enqueue_test.go index e3a2707..02638c7 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -86,6 +86,33 @@ func EnqueueSpec(c gospec.Context) { retryCount := int(result["retry_count"].(float64)) c.Expect(retryCount, Equals, 13) }) + + c.Specify("has retry_options when set", func() { + EnqueueWithOptions( + "enqueue7", "Compare", []string{"foo", "bar"}, + EnqueueOptions{ + RetryCount: 13, + Retry: true, + RetryOptions: RetryOptions{ + Exp: 2, + MinDelay: 0, + MaxDelay: 60, + MaxRand: 30, + }, + }) + + bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue7")) + var result map[string]interface{} + json.Unmarshal(bytes, &result) + c.Expect(result["class"], Equals, "Compare") + + retryOptions := result["retry_options"].(map[string]interface{}) + c.Expect(len(retryOptions), Equals, 4) + c.Expect(retryOptions["exp"].(float64), Equals, float64(2)) + c.Expect(retryOptions["min_delay"].(float64), Equals, float64(0)) + c.Expect(retryOptions["max_delay"].(float64), Equals, float64(60)) + c.Expect(retryOptions["max_rand"].(float64), Equals, float64(30)) + }) }) c.Specify("EnqueueIn", func() { diff --git a/middleware_retry.go b/middleware_retry.go index 1bc1ccf..17bf2ef 100644 --- a/middleware_retry.go +++ b/middleware_retry.go @@ -1,6 +1,7 @@ package workers import ( + "encoding/json" "fmt" "math" "math/rand" @@ -19,15 +20,15 @@ func (r *MiddlewareRetry) Call(queue string, message *Msg, next func() bool) (ac if e := recover(); e != nil { conn := Config.Pool.Get() defer conn.Close() - if retry(message) { message.Set("queue", queue) message.Set("error_message", fmt.Sprintf("%v", e)) retryCount := incrementRetry(message) + retryOptions, _ := message.Get("retry_options").Map() waitDuration := durationToSecondsWithNanoPrecision( time.Duration( - secondsToDelay(retryCount), + secondsToDelay(retryCount, retryOptions), ) * time.Second, ) @@ -86,7 +87,38 @@ func incrementRetry(message *Msg) (retryCount int) { return } -func secondsToDelay(count int) int { - power := math.Pow(float64(count), 4) - return int(power) + 15 + (rand.Intn(30) * (count + 1)) +func secondsToDelay(count int, retryOptions map[string]interface{}) int { + exp := float64(4) + minDelay := float64(15) + maxDelay := math.Inf(1) + maxRand := float64(30) + if retryOptions != nil { + if v, ok := retryOptions["exp"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + exp = v2 + } + } + if v, ok := retryOptions["min_delay"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + minDelay = v2 + } + } + if v, ok := retryOptions["max_delay"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + maxDelay = v2 + } + } + if v, ok := retryOptions["max_rand"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + maxRand = v2 + } + } + } + + power := math.Pow(float64(count), exp) + randN := 0 + if maxRand > 0 { + randN = rand.Intn(int(maxRand)) + } + return int(math.Min(power+minDelay+float64(randN*(count+1)), maxDelay)) } diff --git a/middleware_retry_test.go b/middleware_retry_test.go index a17cf7a..2c85485 100644 --- a/middleware_retry_test.go +++ b/middleware_retry_test.go @@ -1,10 +1,11 @@ package workers import ( + "time" + "github.com/customerio/gospec" . "github.com/customerio/gospec" "github.com/garyburd/redigo/redis" - "time" ) func MiddlewareRetrySpec(c gospec.Context) { @@ -187,5 +188,90 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Expect(count, Equals, 0) }) + c.Specify("use retry_options when provided - min_delay", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":2,\"min_delay\":1200,\"max_rand\":0}}") + var now int + wares.call("myqueue", message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(int(nextAt), Equals, now+1200) + }) + + c.Specify("use retry_options when provided - exp", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_count\":2,\"retry_options\":{\"exp\":2,\"min_delay\":0,\"max_rand\":0}}") + var now int + wares.call("myqueue", message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(int(nextAt), Equals, now+9) + }) + + c.Specify("use retry_options when provided - max_delay", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":20,\"min_delay\":600,\"max_delay\":10,\"max_rand\":10000}}") + var now int + wares.call("myqueue", message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(int(nextAt), Equals, now+10) + }) + + c.Specify("use retry_options when provided - max_rand", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":2,\"min_delay\":0,\"max_rand\":100}}") + var now int + wares.call("myqueue", message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(nextAt, Satisfies, nextAt <= float64(now+100)) + c.Expect(nextAt, Satisfies, nextAt >= float64(now+0)) + }) + Config.Namespace = was } From e3afd6e9a5f4853c546943c418cbc05a395938ad Mon Sep 17 00:00:00 2001 From: cscatolini Date: Sun, 29 Oct 2017 13:52:59 -0200 Subject: [PATCH 03/11] Allow setting max retries. --- enqueue.go | 1 + enqueue_test.go | 12 ++++++------ middleware_retry.go | 6 +++++- middleware_retry_test.go | 4 ++-- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/enqueue.go b/enqueue.go index 1fde003..5bbac57 100644 --- a/enqueue.go +++ b/enqueue.go @@ -24,6 +24,7 @@ type EnqueueData struct { type EnqueueOptions struct { RetryCount int `json:"retry_count,omitempty"` Retry bool `json:"retry,omitempty"` + RetryMax int `json:"retry_max,omitempty"` At float64 `json:"at,omitempty"` RetryOptions RetryOptions `json:"retry_options,omitempty"` } diff --git a/enqueue_test.go b/enqueue_test.go index 02638c7..a38c9c7 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -72,8 +72,8 @@ func EnqueueSpec(c gospec.Context) { c.Expect(ea, IsWithin(0.1), nowToSecondsWithNanoPrecision()) }) - c.Specify("has retry and retry_count when set", func() { - EnqueueWithOptions("enqueue6", "Compare", []string{"foo", "bar"}, EnqueueOptions{RetryCount: 13, Retry: true}) + c.Specify("has retry and retry_max when set", func() { + EnqueueWithOptions("enqueue6", "Compare", []string{"foo", "bar"}, EnqueueOptions{RetryMax: 13, Retry: true}) bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue6")) var result map[string]interface{} @@ -83,16 +83,16 @@ func EnqueueSpec(c gospec.Context) { retry := result["retry"].(bool) c.Expect(retry, Equals, true) - retryCount := int(result["retry_count"].(float64)) - c.Expect(retryCount, Equals, 13) + retryMax := int(result["retry_max"].(float64)) + c.Expect(retryMax, Equals, 13) }) c.Specify("has retry_options when set", func() { EnqueueWithOptions( "enqueue7", "Compare", []string{"foo", "bar"}, EnqueueOptions{ - RetryCount: 13, - Retry: true, + RetryMax: 13, + Retry: true, RetryOptions: RetryOptions{ Exp: 2, MinDelay: 0, diff --git a/middleware_retry.go b/middleware_retry.go index 17bf2ef..b36d21c 100644 --- a/middleware_retry.go +++ b/middleware_retry.go @@ -62,11 +62,15 @@ func retry(message *Msg) bool { if param, err := message.Get("retry").Bool(); err == nil { retry = param - } else if param, err := message.Get("retry").Int(); err == nil { + } else if param, err := message.Get("retry").Int(); err == nil { // compatible with sidekiq max = param retry = true } + if param, err := message.Get("retry_max").Int(); err == nil { + max = param + } + count, _ := message.Get("retry_count").Int() return retry && count < max diff --git a/middleware_retry_test.go b/middleware_retry_test.go index 2c85485..8564e86 100644 --- a/middleware_retry_test.go +++ b/middleware_retry_test.go @@ -135,7 +135,7 @@ func MiddlewareRetrySpec(c gospec.Context) { }) c.Specify("handles recurring failed message with customized max", func() { - message, _ := NewMsg("{\"jid\":\"2\",\"retry\":10,\"queue\":\"default\",\"error_message\":\"bam\",\"failed_at\":\"2013-07-20 14:03:42 UTC\",\"retry_count\":8}") + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"queue\":\"default\",\"error_message\":\"bam\",\"failed_at\":\"2013-07-20 14:03:42 UTC\",\"retry_count\":8,\"retry_max\":10}") wares.call("myqueue", message, func() { worker.process(message) @@ -175,7 +175,7 @@ func MiddlewareRetrySpec(c gospec.Context) { }) c.Specify("doesn't retry after customized number of retries", func() { - message, _ := NewMsg("{\"jid\":\"2\",\"retry\":3,\"retry_count\":3}") + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_max\":3,\"retry_count\":3}") wares.call("myqueue", message, func() { worker.process(message) From 7680f06e855f55db7cba1fb616e537f14d8189b9 Mon Sep 17 00:00:00 2001 From: cscatolini Date: Wed, 8 Nov 2017 11:09:06 -0200 Subject: [PATCH 04/11] Avoid crashing when worker is nil. --- manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manager.go b/manager.go index 0e4ee6f..afeaeb4 100644 --- a/manager.go +++ b/manager.go @@ -77,7 +77,7 @@ func (m *manager) loadWorkers() { func (m *manager) processing() (count int) { m.workersM.Lock() for _, worker := range m.workers { - if worker.processing() { + if worker != nil && worker.processing() { count++ } } From c48500dfa6f6e3e6932ecf5bea6d09b4f9b0dcf0 Mon Sep 17 00:00:00 2001 From: Luiz Felipe Takakura Date: Fri, 13 Jul 2018 17:16:07 -0300 Subject: [PATCH 05/11] Implement connection options to set a different redis connection to use on EnqueueWithOptions --- config.go | 60 +++++++++++++++++++++++++++++------------------------- enqueue.go | 20 ++++++++++++------ 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/config.go b/config.go index f93ac94..30c0377 100644 --- a/config.go +++ b/config.go @@ -18,7 +18,7 @@ type config struct { var Config *config func Configure(options map[string]string) { - var poolSize int + var namespace string var pollInterval int @@ -40,41 +40,45 @@ func Configure(options map[string]string) { pollInterval = 15 } - poolSize, _ = strconv.Atoi(options["pool"]) - Config = &config{ options["process"], namespace, pollInterval, - &redis.Pool{ - MaxIdle: poolSize, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", options["server"]) - if err != nil { + GetConnectionPool(options), + func(queue string) Fetcher { + return NewFetch(queue, make(chan *Msg), make(chan bool)) + }, + } +} + +func GetConnectionPool(options map[string]string) *redis.Pool { + poolSize, _ := strconv.Atoi(options["pool"]) + + return &redis.Pool{ + MaxIdle: poolSize, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", options["server"]) + if err != nil { + return nil, err + } + if options["password"] != "" { + if _, err := c.Do("AUTH", options["password"]); err != nil { + c.Close() return nil, err } - if options["password"] != "" { - if _, err := c.Do("AUTH", options["password"]); err != nil { - c.Close() - return nil, err - } - } - if options["database"] != "" { - if _, err := c.Do("SELECT", options["database"]); err != nil { - c.Close() - return nil, err - } + } + if options["database"] != "" { + if _, err := c.Do("SELECT", options["database"]); err != nil { + c.Close() + return nil, err } - return c, err - }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - _, err := c.Do("PING") - return err - }, + } + return c, err }, - func(queue string) Fetcher { - return NewFetch(queue, make(chan *Msg), make(chan bool)) + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err }, } } diff --git a/enqueue.go b/enqueue.go index 5bbac57..a04019c 100644 --- a/enqueue.go +++ b/enqueue.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "time" + + "github.com/garyburd/redigo/redis" ) const ( @@ -22,11 +24,12 @@ type EnqueueData struct { } type EnqueueOptions struct { - RetryCount int `json:"retry_count,omitempty"` - Retry bool `json:"retry,omitempty"` - RetryMax int `json:"retry_max,omitempty"` - At float64 `json:"at,omitempty"` - RetryOptions RetryOptions `json:"retry_options,omitempty"` + RetryCount int `json:"retry_count,omitempty"` + Retry bool `json:"retry,omitempty"` + RetryMax int `json:"retry_max,omitempty"` + At float64 `json:"at,omitempty"` + RetryOptions RetryOptions `json:"retry_options,omitempty"` + ConnectionOptions map[string]string `json:"connection_options,omitempty"` } type RetryOptions struct { @@ -79,7 +82,12 @@ func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptio return data.Jid, err } - conn := Config.Pool.Get() + var conn redis.Conn + if len(opts.ConnectionOptions) == 0 { + conn = Config.Pool.Get() + } else { + conn = GetConnectionPool(opts.ConnectionOptions).Get() + } defer conn.Close() _, err = conn.Do("sadd", Config.Namespace+"queues", queue) From 212c17df9a3cac1c17671b1c863f52a15a4963ff Mon Sep 17 00:00:00 2001 From: Luiz Felipe Takakura Date: Fri, 13 Jul 2018 17:23:14 -0300 Subject: [PATCH 06/11] Update README --- README.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/README.md b/README.md index 0fffa36..fc64226 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,18 @@ func main() { // Add a job to a queue with retry workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true}) + // Add a job to a queue in a different redis instance + workers.EnqueueWithOptions("myqueue4", "Add", []int{1, 2}, + workers.EnqueueOptions{ + Retry: true, + ConnectionOptions: map[string]string{ + "server": "localhost:6378", + "database": "my-database", + "pool": "10", + "password": "pass", + }}, + ) + // stats will be available at http://localhost:8080/stats go workers.StatsServer(8080) From 7be6e40164fa97f6144bbfbea2fbf17be7bc4551 Mon Sep 17 00:00:00 2001 From: Henrique Rodrigues Date: Fri, 21 Sep 2018 14:48:55 -0300 Subject: [PATCH 07/11] stats refactor to return struct --- stats.go | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/stats.go b/stats.go index d62864e..ba06a41 100644 --- a/stats.go +++ b/stats.go @@ -15,10 +15,42 @@ type stats struct { Retries int64 `json:"retries"` } +// Stats writes stats on response writer func Stats(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Access-Control-Allow-Origin", "*") + stats := getStats() + + body, _ := json.MarshalIndent(stats, "", " ") + fmt.Fprintln(w, string(body)) +} + +// WorkerStats holds workers stats +type WorkerStats struct { + Processed int `json:"processed"` + Failed int `json:"failed"` + Enqueued map[string]string `json:"enqueued"` + Retries int64 `json:"retries"` +} + +// GetStats returns workers stats +func GetStats() *WorkerStats { + stats := getStats() + enqueued := map[string]string{} + if statsEnqueued, ok := stats.Enqueued.(map[string]string); ok { + enqueued = statsEnqueued + } + + return &WorkerStats{ + Processed: stats.Processed, + Failed: stats.Failed, + Retries: stats.Retries, + Enqueued: enqueued, + } +} + +func getStats() stats { jobs := make(map[string][]*map[string]interface{}) enqueued := make(map[string]string) @@ -55,7 +87,7 @@ func Stats(w http.ResponseWriter, req *http.Request) { conn.Send("get", Config.Namespace+"stat:failed") conn.Send("zcard", Config.Namespace+RETRY_KEY) - for key, _ := range enqueued { + for key := range enqueued { conn.Send("llen", fmt.Sprintf("%squeue:%s", Config.Namespace, key)) } @@ -83,7 +115,7 @@ func Stats(w http.ResponseWriter, req *http.Request) { } queueIndex := 0 - for key, _ := range enqueued { + for key := range enqueued { if queueIndex == (index - 3) { enqueued[key] = fmt.Sprintf("%d", result.(int64)) } @@ -92,6 +124,5 @@ func Stats(w http.ResponseWriter, req *http.Request) { } } - body, _ := json.MarshalIndent(stats, "", " ") - fmt.Fprintln(w, string(body)) + return stats } From d1be24994f2aef33257256035eaba70a112bf57f Mon Sep 17 00:00:00 2001 From: cscatolini Date: Wed, 27 Feb 2019 13:59:30 -0300 Subject: [PATCH 08/11] Improve logging. - Use log levels (fatal, debug, error, info and warn) - Allow structured logging (uses logrus by default) - Change logging middleware logs to debug - A few tweaks in some log messages, specially errors --- Gopkg.lock | 53 +++++++++++++++++++++++++++++++++++++++++-- fetcher.go | 6 ++--- manager.go | 4 ++-- middleware_logging.go | 10 ++++---- middleware_stats.go | 2 +- msg.go | 5 ++-- stats.go | 2 +- workers.go | 8 ++----- workers_logger.go | 41 +++++++++++++++++++++++++++++++-- 9 files changed, 107 insertions(+), 24 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index e07ef50..c7bd1b0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,26 +2,75 @@ [[projects]] + digest = "1:512883404c2a99156e410e9880e3bb35ecccc0c07c1159eb204b5f3ef3c431b3" name = "github.com/bitly/go-simplejson" packages = ["."] + pruneopts = "" revision = "aabad6e819789e569bd6aabf444c935aa9ba1e44" version = "v0.5.0" [[projects]] branch = "master" + digest = "1:c752e7d9b6498f4f0e244d61ddb366a9d06053347988f5397352b5dbb5795eef" name = "github.com/customerio/gospec" packages = ["."] + pruneopts = "" revision = "a5cc0e48aa393147b3fc264d3b9582b69ea2733d" [[projects]] + digest = "1:a3e3741c2ca1cb33f5988d197bff6e172dbb2b9f4380378efbe29bca5f0204d2" name = "github.com/garyburd/redigo" - packages = ["internal","redis"] + packages = [ + "internal", + "redis", + ] + pruneopts = "" revision = "34a326de1fea52965fa5ad664d3fc7163dd4b0a1" version = "v1.2.0" +[[projects]] + digest = "1:0f51cee70b0d254dbc93c22666ea2abf211af81c1701a96d04e2284b408621db" + name = "github.com/konsorten/go-windows-terminal-sequences" + packages = ["."] + pruneopts = "" + revision = "f55edac94c9bbba5d6182a4be46d86a2c9b5b50e" + version = "v1.0.2" + +[[projects]] + digest = "1:9a3c631555e0351fdc4e696577bb63afd90c399d782a8462dba9d100d7021db3" + name = "github.com/sirupsen/logrus" + packages = ["."] + pruneopts = "" + revision = "e1e72e9de974bd926e5c56f83753fba2df402ce5" + version = "v1.3.0" + +[[projects]] + branch = "master" + digest = "1:10f068b4b7d1a60a6d8a877c8f1d72b7e745cf1c32aaf9eaf11e2259938f0dfb" + name = "golang.org/x/crypto" + packages = ["ssh/terminal"] + pruneopts = "" + revision = "7f87c0fbb88b590338857bcb720678c2583d4dea" + +[[projects]] + branch = "master" + digest = "1:0f39f8ca1c492b3b59020cb0f95e6635d5051bf7048a32ac5f9c6bf1adae6ec8" + name = "golang.org/x/sys" + packages = [ + "unix", + "windows", + ] + pruneopts = "" + revision = "775f8194d0f9e65c46913c7be783d3d95a29333c" + [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "d05967babd4b751e99afcefd6ce85ffd6299ad787cc90986f38aeff21e5172dc" + input-imports = [ + "github.com/bitly/go-simplejson", + "github.com/customerio/gospec", + "github.com/garyburd/redigo/redis", + "github.com/sirupsen/logrus", + ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/fetcher.go b/fetcher.go index 2745a80..8cee8cc 100644 --- a/fetcher.go +++ b/fetcher.go @@ -88,7 +88,7 @@ func (f *fetch) tryFetchMessage() { if err != nil { // If redis returns null, the queue is empty. Just ignore the error. if err.Error() != "redigo: nil returned" { - Logger.Println("ERR: ", err) + Logger.Errorln("failed to fetch message", err) time.Sleep(1 * time.Second) } } else { @@ -100,7 +100,7 @@ func (f *fetch) sendMessage(message string) { msg, err := NewMsg(message) if err != nil { - Logger.Println("ERR: Couldn't create message from", message, ":", err) + Logger.Errorln("failed to create message from", message, ":", err) return } @@ -145,7 +145,7 @@ func (f *fetch) inprogressMessages() []string { messages, err := redis.Strings(conn.Do("lrange", f.inprogressQueue(), 0, -1)) if err != nil { - Logger.Println("ERR: ", err) + Logger.Errorln("failed to fetch messages in progress", err) } return messages diff --git a/manager.go b/manager.go index afeaeb4..8533908 100644 --- a/manager.go +++ b/manager.go @@ -32,7 +32,7 @@ func (m *manager) prepare() { } func (m *manager) quit() { - Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).") + Logger.Infoln("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).") m.prepare() m.workersM.Lock() @@ -50,7 +50,7 @@ func (m *manager) quit() { } func (m *manager) manage() { - Logger.Println("processing queue", m.queueName(), "with", m.concurrency, "workers.") + Logger.Infoln("processing queue", m.queueName(), "with", m.concurrency, "workers.") go m.fetch.Fetch() diff --git a/middleware_logging.go b/middleware_logging.go index 11eaa13..a4aa062 100644 --- a/middleware_logging.go +++ b/middleware_logging.go @@ -12,16 +12,16 @@ func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() bool) ( prefix := fmt.Sprint(queue, " JID-", message.Jid()) start := time.Now() - Logger.Println(prefix, "start") - Logger.Println(prefix, "args:", message.Args().ToJson()) + Logger.Debug(prefix, "start") + Logger.Debug(prefix, "args:", message.Args().ToJson()) defer func() { if e := recover(); e != nil { - Logger.Println(prefix, "fail:", time.Since(start)) + Logger.Debug(prefix, "fail:", time.Since(start)) buf := make([]byte, 4096) buf = buf[:runtime.Stack(buf, false)] - Logger.Printf("%s error: %v\n%s", prefix, e, buf) + Logger.Errorf("%s error: %v\n%s", prefix, e, buf) panic(e) } @@ -29,7 +29,7 @@ func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() bool) ( acknowledge = next() - Logger.Println(prefix, "done:", time.Since(start)) + Logger.Debug(prefix, "done:", time.Since(start)) return } diff --git a/middleware_stats.go b/middleware_stats.go index 62a7663..c3bcc63 100644 --- a/middleware_stats.go +++ b/middleware_stats.go @@ -32,6 +32,6 @@ func incrementStats(metric string) { conn.Send("incr", Config.Namespace+"stat:"+metric+":"+today) if _, err := conn.Do("exec"); err != nil { - Logger.Println("couldn't save stats:", err) + Logger.Errorln("failed to save stats:", err) } } diff --git a/msg.go b/msg.go index f922fa0..2a47c36 100644 --- a/msg.go +++ b/msg.go @@ -1,8 +1,9 @@ package workers import ( - "github.com/bitly/go-simplejson" "reflect" + + "github.com/bitly/go-simplejson" ) type data struct { @@ -39,7 +40,7 @@ func (d *data) ToJson() string { json, err := d.Encode() if err != nil { - Logger.Println("ERR: Couldn't generate json from", d, ":", err) + Logger.Errorln("failed to generate json from", d, ":", err) } return string(json) diff --git a/stats.go b/stats.go index ba06a41..e9248a7 100644 --- a/stats.go +++ b/stats.go @@ -94,7 +94,7 @@ func getStats() stats { r, err := conn.Do("exec") if err != nil { - Logger.Println("couldn't retrieve stats:", err) + Logger.Errorln("failed to retrieve stats:", err) } results := r.([]interface{}) diff --git a/workers.go b/workers.go index fc39be6..1ba2658 100644 --- a/workers.go +++ b/workers.go @@ -3,9 +3,7 @@ package workers import ( "errors" "fmt" - "log" "net/http" - "os" "sync" ) @@ -14,8 +12,6 @@ const ( SCHEDULED_JOBS_KEY = "schedule" ) -var Logger WorkersLogger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds) - var managers = make(map[string]*manager) var schedule *scheduled var control = make(map[string]chan string) @@ -88,10 +84,10 @@ func Quit() { func StatsServer(port int) { http.HandleFunc("/stats", Stats) - Logger.Println("Stats are available at", fmt.Sprint("http://localhost:", port, "/stats")) + Logger.Infoln("Stats are available at", fmt.Sprint("http://localhost:", port, "/stats")) if err := http.ListenAndServe(fmt.Sprint(":", port), nil); err != nil { - Logger.Println(err) + Logger.Error("failed to start stats server", err) } } diff --git a/workers_logger.go b/workers_logger.go index cab91b1..4ac7005 100644 --- a/workers_logger.go +++ b/workers_logger.go @@ -1,6 +1,43 @@ package workers +import "github.com/sirupsen/logrus" + +// WorkersLogger represents the log interface type WorkersLogger interface { - Println(...interface{}) - Printf(string, ...interface{}) + Fatal(format ...interface{}) + Fatalf(format string, args ...interface{}) + Fatalln(args ...interface{}) + + Debug(args ...interface{}) + Debugf(format string, args ...interface{}) + Debugln(args ...interface{}) + + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Errorln(args ...interface{}) + + Info(args ...interface{}) + Infof(format string, args ...interface{}) + Infoln(args ...interface{}) + + Warn(args ...interface{}) + Warnf(format string, args ...interface{}) + Warnln(args ...interface{}) +} + +// Logger is the default logger +var Logger = initLogger() + +func initLogger() WorkersLogger { + plog := logrus.New() + plog.Formatter = new(logrus.TextFormatter) + plog.Level = logrus.InfoLevel + return plog.WithFields(logrus.Fields{"source": "go-workers"}) +} + +// SetLogger rewrites the default logger +func SetLogger(l WorkersLogger) { + if l != nil { + Logger = l + } } From 9e624b6bf258c4b78954b3e53e0d6311a8032b42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Hahn?= Date: Wed, 21 Aug 2019 17:09:32 -0300 Subject: [PATCH 09/11] Changing queue behavior to be fifo --- enqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/enqueue.go b/enqueue.go index a04019c..95c3ef9 100644 --- a/enqueue.go +++ b/enqueue.go @@ -95,7 +95,7 @@ func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptio return "", err } queue = Config.Namespace + "queue:" + queue - _, err = conn.Do("rpush", queue, bytes) + _, err = conn.Do("lpush", queue, bytes) if err != nil { return "", err } From 52bf4f3bb31d5ad8c8a7949e023da753e510329d Mon Sep 17 00:00:00 2001 From: Tobias Ulrich Date: Mon, 1 Apr 2024 10:24:50 -0300 Subject: [PATCH 10/11] Add support for Reds dialing options and general project updates (#4) * Update project setup * Update deprecated Redis package to the new one * Change the worker configuration options * Adjust test to improve logs traceability --- .gitignore | 3 +- .travis.yml | 2 +- Gopkg.lock | 76 ------------------------------------- Gopkg.toml | 34 ----------------- Makefile | 18 +++++++++ README.md | 82 ++++++++++++++++++++-------------------- all_specs_test.go | 20 ++++++---- config.go | 78 +++++++++++++++++++++----------------- config_test.go | 48 +++++++++++------------ docker-compose.yml | 19 ++++++++++ enqueue.go | 24 +++++++----- enqueue_test.go | 14 +++++-- fetch_test.go | 2 +- fetcher.go | 2 +- go.mod | 15 ++++++++ go.sum | 25 ++++++++++++ manager_test.go | 17 +++++---- middleware_retry_test.go | 38 ++++++++++--------- middleware_stats_test.go | 8 ++-- scheduled.go | 4 +- scheduled_test.go | 2 +- worker_test.go | 6 ++- workers_test.go | 15 +++++--- 23 files changed, 281 insertions(+), 271 deletions(-) delete mode 100644 Gopkg.lock delete mode 100644 Gopkg.toml create mode 100644 Makefile create mode 100644 docker-compose.yml create mode 100644 go.mod create mode 100644 go.sum diff --git a/.gitignore b/.gitignore index 5657f6e..cb756e9 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -vendor \ No newline at end of file +vendor +.idea \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 141ea53..203b4d4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: go go: - - 1.7 + - "1.19" script: - go get github.com/customerio/gospec diff --git a/Gopkg.lock b/Gopkg.lock deleted file mode 100644 index c7bd1b0..0000000 --- a/Gopkg.lock +++ /dev/null @@ -1,76 +0,0 @@ -# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. - - -[[projects]] - digest = "1:512883404c2a99156e410e9880e3bb35ecccc0c07c1159eb204b5f3ef3c431b3" - name = "github.com/bitly/go-simplejson" - packages = ["."] - pruneopts = "" - revision = "aabad6e819789e569bd6aabf444c935aa9ba1e44" - version = "v0.5.0" - -[[projects]] - branch = "master" - digest = "1:c752e7d9b6498f4f0e244d61ddb366a9d06053347988f5397352b5dbb5795eef" - name = "github.com/customerio/gospec" - packages = ["."] - pruneopts = "" - revision = "a5cc0e48aa393147b3fc264d3b9582b69ea2733d" - -[[projects]] - digest = "1:a3e3741c2ca1cb33f5988d197bff6e172dbb2b9f4380378efbe29bca5f0204d2" - name = "github.com/garyburd/redigo" - packages = [ - "internal", - "redis", - ] - pruneopts = "" - revision = "34a326de1fea52965fa5ad664d3fc7163dd4b0a1" - version = "v1.2.0" - -[[projects]] - digest = "1:0f51cee70b0d254dbc93c22666ea2abf211af81c1701a96d04e2284b408621db" - name = "github.com/konsorten/go-windows-terminal-sequences" - packages = ["."] - pruneopts = "" - revision = "f55edac94c9bbba5d6182a4be46d86a2c9b5b50e" - version = "v1.0.2" - -[[projects]] - digest = "1:9a3c631555e0351fdc4e696577bb63afd90c399d782a8462dba9d100d7021db3" - name = "github.com/sirupsen/logrus" - packages = ["."] - pruneopts = "" - revision = "e1e72e9de974bd926e5c56f83753fba2df402ce5" - version = "v1.3.0" - -[[projects]] - branch = "master" - digest = "1:10f068b4b7d1a60a6d8a877c8f1d72b7e745cf1c32aaf9eaf11e2259938f0dfb" - name = "golang.org/x/crypto" - packages = ["ssh/terminal"] - pruneopts = "" - revision = "7f87c0fbb88b590338857bcb720678c2583d4dea" - -[[projects]] - branch = "master" - digest = "1:0f39f8ca1c492b3b59020cb0f95e6635d5051bf7048a32ac5f9c6bf1adae6ec8" - name = "golang.org/x/sys" - packages = [ - "unix", - "windows", - ] - pruneopts = "" - revision = "775f8194d0f9e65c46913c7be783d3d95a29333c" - -[solve-meta] - analyzer-name = "dep" - analyzer-version = 1 - input-imports = [ - "github.com/bitly/go-simplejson", - "github.com/customerio/gospec", - "github.com/garyburd/redigo/redis", - "github.com/sirupsen/logrus", - ] - solver-name = "gps-cdcl" - solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml deleted file mode 100644 index 466d307..0000000 --- a/Gopkg.toml +++ /dev/null @@ -1,34 +0,0 @@ - -# Gopkg.toml example -# -# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md -# for detailed Gopkg.toml documentation. -# -# required = ["github.com/user/thing/cmd/thing"] -# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] -# -# [[constraint]] -# name = "github.com/user/project" -# version = "1.0.0" -# -# [[constraint]] -# name = "github.com/user/project2" -# branch = "dev" -# source = "github.com/myfork/project2" -# -# [[override]] -# name = "github.com/x/y" -# version = "2.4.0" - - -[[constraint]] - name = "github.com/bitly/go-simplejson" - version = "0.5.0" - -[[constraint]] - branch = "master" - name = "github.com/customerio/gospec" - -[[constraint]] - name = "github.com/garyburd/redigo" - version = "1.2.0" diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..4f239fc --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ + + +export COMPOSE_PROJECT_NAME := go-workers + +.PHONY: deps/start +deps/start: + echo "${COMPOSE_PROJECT_NAME}" + docker-compose -p '${COMPOSE_PROJECT_NAME}' up -d + +.PHONY: deps/stop +deps/stop: + docker-compose -p '${COMPOSE_PROJECT_NAME}' down + +test: + @make deps/start + go get github.com/customerio/gospec + go test -v + @make deps/stop diff --git a/README.md b/README.md index fc64226..0a070d9 100644 --- a/README.md +++ b/README.md @@ -18,67 +18,69 @@ Example usage: package main import ( - "github.com/jrallison/go-workers" + "github.com/topfreegames/go-workers" + workers "go-workers" ) func myJob(message *workers.Msg) { - // do something with your message - // message.Jid() - // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson) + // do something with your message + // message.Jid() + // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson) } type myMiddleware struct{} func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) { - // do something before each message is processed - acknowledge = next() - // do something after each message is processed - return -} + // do something before each message is processed + acknowledge = next() + // do something after each message is processed + return +} func main() { - workers.Configure(map[string]string{ - // location of redis instance - "server": "localhost:6379", - // instance of the database - "database": "0", - // number of connections to keep open with redis - "pool": "30", - // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash) - "process": "1", - }) + workers.Configure(workers.Options{ + // location of redis instance + Address: "localhost:6379", + // instance of the database + Database: "0", + // number of connections to keep open with redis + PoolSize: "30", + // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash) + ProcessID: "1", + }) - workers.Middleware.Append(&myMiddleware{}) + workers.Middleware.Append(&myMiddleware{}) - // pull messages from "myqueue" with concurrency of 10 - workers.Process("myqueue", myJob, 10) + // pull messages from "myqueue" with concurrency of 10 + workers.Process("myqueue", myJob, 10) - // pull messages from "myqueue2" with concurrency of 20 - workers.Process("myqueue2", myJob, 20) + // pull messages from "myqueue2" with concurrency of 20 + workers.Process("myqueue2", myJob, 20) - // Add a job to a queue - workers.Enqueue("myqueue3", "Add", []int{1, 2}) + // Add a job to a queue + workers.Enqueue("myqueue3", "Add", []int{1, 2}) - // Add a job to a queue with retry - workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true}) + // Add a job to a queue with retry + workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true}) - // Add a job to a queue in a different redis instance - workers.EnqueueWithOptions("myqueue4", "Add", []int{1, 2}, + // Add a job to a queue in a different redis instance + workers.EnqueueWithOptions("myqueue4", "Add", []int{1, 2}, workers.EnqueueOptions{ Retry: true, - ConnectionOptions: map[string]string{ - "server": "localhost:6378", - "database": "my-database", - "pool": "10", - "password": "pass", - }}, + ConnectionOptions: workers.Options{ + Address: "localhost:6378", + Database: "my-database", + PoolSize: 10, + Password: "pass", + }, + }, ) - // stats will be available at http://localhost:8080/stats - go workers.StatsServer(8080) + // stats will be available at http://localhost:8080/stats + go workers.StatsServer(8080) - // Blocks until process is told to exit via unix signal - workers.Run() + // Blocks until process is told to exit via unix signal + workers.Run() } ``` diff --git a/all_specs_test.go b/all_specs_test.go index 699c44a..be1bee6 100644 --- a/all_specs_test.go +++ b/all_specs_test.go @@ -18,16 +18,22 @@ func TestAllSpecs(t *testing.T) { r.Parallel = false r.BeforeEach = func() { - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", - "database": "15", - "pool": "1", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", + Database: "15", + PoolSize: 1, }) conn := Config.Pool.Get() - conn.Do("flushdb") - conn.Close() + _, err := conn.Do("flushdb") + if err != nil { + panic("failed to flush db: " + err.Error()) + } + err = conn.Close() + if err != nil { + panic("failed close connection: " + err.Error()) + } } // List all specs here diff --git a/config.go b/config.go index 30c0377..850ad45 100644 --- a/config.go +++ b/config.go @@ -1,49 +1,56 @@ package workers import ( - "strconv" + "fmt" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) -type config struct { +type Options struct { + Address string + Password string + Database string + ProcessID string + Namespace string + PoolSize int + PoolInterval int + DialOptions []redis.DialOption +} + +type WorkerConfig struct { processId string Namespace string - PollInterval int + PoolInterval int Pool *redis.Pool Fetch func(queue string) Fetcher } -var Config *config - -func Configure(options map[string]string) { +var Config *WorkerConfig +func Configure(options Options) { var namespace string - var pollInterval int - if options["server"] == "" { - panic("Configure requires a 'server' option, which identifies a Redis instance") + if options.Address == "" { + panic("Configure requires a 'Address' option, which identifies a Redis instance") } - if options["process"] == "" { - panic("Configure requires a 'process' option, which uniquely identifies this instance") + if options.ProcessID == "" { + panic("Configure requires a 'ProcessID' option, which uniquely identifies this instance") } - if options["pool"] == "" { - options["pool"] = "1" + if options.PoolSize <= 0 { + options.PoolSize = 1 } - if options["namespace"] != "" { - namespace = options["namespace"] + ":" + if options.Namespace != "" { + namespace = options.Namespace + ":" } - if seconds, err := strconv.Atoi(options["poll_interval"]); err == nil { - pollInterval = seconds - } else { - pollInterval = 15 + if options.PoolInterval <= 0 { + options.PoolInterval = 15 } - Config = &config{ - options["process"], + Config = &WorkerConfig{ + options.ProcessID, namespace, - pollInterval, + options.PoolInterval, GetConnectionPool(options), func(queue string) Fetcher { return NewFetch(queue, make(chan *Msg), make(chan bool)) @@ -51,27 +58,28 @@ func Configure(options map[string]string) { } } -func GetConnectionPool(options map[string]string) *redis.Pool { - poolSize, _ := strconv.Atoi(options["pool"]) - +func GetConnectionPool(options Options) *redis.Pool { return &redis.Pool{ - MaxIdle: poolSize, + MaxIdle: options.PoolSize, IdleTimeout: 240 * time.Second, Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", options["server"]) + c, err := redis.Dial("tcp", options.Address, options.DialOptions...) if err != nil { return nil, err } - if options["password"] != "" { - if _, err := c.Do("AUTH", options["password"]); err != nil { - c.Close() + if options.Password != "" { + if _, err := c.Do("AUTH", options.Password); err != nil { + if errClose := c.Close(); errClose != nil { + return nil, fmt.Errorf("%w. failed to close connection: %s", err, errClose.Error()) + } return nil, err } } - if options["database"] != "" { - if _, err := c.Do("SELECT", options["database"]); err != nil { - c.Close() - return nil, err + if options.Database != "" { + if _, err := c.Do("SELECT", options.Database); err != nil { + if errClose := c.Close(); errClose != nil { + return nil, fmt.Errorf("%w. failed to close connection: %s", err, errClose.Error()) + } } } return c, err diff --git a/config_test.go b/config_test.go index afde76d..1033d13 100644 --- a/config_test.go +++ b/config_test.go @@ -21,10 +21,10 @@ func ConfigSpec(c gospec.Context) { c.Specify("sets redis pool size which defaults to 1", func() { c.Expect(Config.Pool.MaxIdle, Equals, 1) - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", - "pool": "20", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", + PoolSize: 20, }) c.Expect(Config.Pool.MaxIdle, Equals, 20) @@ -33,9 +33,9 @@ func ConfigSpec(c gospec.Context) { c.Specify("can specify custom process", func() { c.Expect(Config.processId, Equals, "1") - Configure(map[string]string{ - "server": "localhost:6379", - "process": "2", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "2", }) c.Expect(Config.processId, Equals, "2") @@ -43,48 +43,48 @@ func ConfigSpec(c gospec.Context) { c.Specify("requires a server parameter", func() { err := recoverOnPanic(func() { - Configure(map[string]string{"process": "2"}) + Configure(Options{ProcessID: "2"}) }) - c.Expect(err, Equals, "Configure requires a 'server' option, which identifies a Redis instance") + c.Expect(err, Equals, "Configure requires a 'Address' option, which identifies a Redis instance") }) c.Specify("requires a process parameter", func() { err := recoverOnPanic(func() { - Configure(map[string]string{"server": "localhost:6379"}) + Configure(Options{Address: "localhost:6379"}) }) - c.Expect(err, Equals, "Configure requires a 'process' option, which uniquely identifies this instance") + c.Expect(err, Equals, "Configure requires a 'ProcessID' option, which uniquely identifies this instance") }) c.Specify("adds ':' to the end of the namespace", func() { c.Expect(Config.Namespace, Equals, "") - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", - "namespace": "prod", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", + Namespace: "prod", }) c.Expect(Config.Namespace, Equals, "prod:") }) c.Specify("defaults poll interval to 15 seconds", func() { - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", }) - c.Expect(Config.PollInterval, Equals, 15) + c.Expect(Config.PoolInterval, Equals, 15) }) c.Specify("allows customization of poll interval", func() { - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", - "poll_interval": "1", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", + PoolInterval: 1, }) - c.Expect(Config.PollInterval, Equals, 1) + c.Expect(Config.PoolInterval, Equals, 1) }) } diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6413de0 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,19 @@ +version: '2.2' + +services: + redis: + image: redis:5-alpine + ports: + - 6379:6379 + healthcheck: + test: [ "CMD", "redis-cli", "ping" ] + interval: 3s + timeout: 3s + retries: 30 + networks: + - go-workers + +networks: + go-workers: + driver: bridge + name: go-workers \ No newline at end of file diff --git a/enqueue.go b/enqueue.go index 95c3ef9..2754a60 100644 --- a/enqueue.go +++ b/enqueue.go @@ -7,7 +7,7 @@ import ( "io" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) const ( @@ -24,12 +24,12 @@ type EnqueueData struct { } type EnqueueOptions struct { - RetryCount int `json:"retry_count,omitempty"` - Retry bool `json:"retry,omitempty"` - RetryMax int `json:"retry_max,omitempty"` - At float64 `json:"at,omitempty"` - RetryOptions RetryOptions `json:"retry_options,omitempty"` - ConnectionOptions map[string]string `json:"connection_options,omitempty"` + RetryCount int `json:"retry_count,omitempty"` + Retry bool `json:"retry,omitempty"` + RetryMax int `json:"retry_max,omitempty"` + At float64 `json:"at,omitempty"` + RetryOptions RetryOptions `json:"retry_options,omitempty"` + ConnectionOptions Options `json:"connection_options,omitempty"` } type RetryOptions struct { @@ -83,12 +83,18 @@ func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptio } var conn redis.Conn - if len(opts.ConnectionOptions) == 0 { + if opts.ConnectionOptions.Address == "" { + Logger.Debug("missing redis Address in EnqueueWithOptions. using default pool") conn = Config.Pool.Get() } else { conn = GetConnectionPool(opts.ConnectionOptions).Get() } - defer conn.Close() + defer func(conn redis.Conn) { + err := conn.Close() + if err != nil { + Logger.Errorf("failed to close Redis connection in EnqueueWithOptions: %w", err) + } + }(conn) _, err = conn.Do("sadd", Config.Namespace+"queues", queue) if err != nil { diff --git a/enqueue_test.go b/enqueue_test.go index a38c9c7..c06cbfa 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -5,7 +5,8 @@ import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + + "github.com/gomodule/redigo/redis" ) func EnqueueSpec(c gospec.Context) { @@ -17,8 +18,12 @@ func EnqueueSpec(c gospec.Context) { defer conn.Close() c.Specify("makes the queue available", func() { - Enqueue("enqueue1", "Add", []int{1, 2}) + enqueue, err := Enqueue("enqueue1", "Add", []int{1, 2}) + if err != nil { + panic(err) + } + c.Expect(enqueue, Not(Equals), "") found, _ := redis.Bool(conn.Do("sismember", "prod:queues", "enqueue1")) c.Expect(found, IsTrue) }) @@ -103,7 +108,10 @@ func EnqueueSpec(c gospec.Context) { bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue7")) var result map[string]interface{} - json.Unmarshal(bytes, &result) + err := json.Unmarshal(bytes, &result) + if err != nil { + panic(err) + } c.Expect(result["class"], Equals, "Compare") retryOptions := result["retry_options"].(map[string]interface{}) diff --git a/fetch_test.go b/fetch_test.go index 37e3fcc..3a355f2 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -3,7 +3,7 @@ package workers import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) func buildFetch(queue string) Fetcher { diff --git a/fetcher.go b/fetcher.go index 8cee8cc..cbbf93b 100644 --- a/fetcher.go +++ b/fetcher.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) type Fetcher interface { diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4124dd9 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module go-workers + +go 1.19 + +require ( + github.com/bitly/go-simplejson v0.5.1 + github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 + github.com/gomodule/redigo v1.9.2 + github.com/sirupsen/logrus v1.9.3 +) + +require ( + github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..15904ff --- /dev/null +++ b/go.sum @@ -0,0 +1,25 @@ +github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow= +github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= +github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 h1:O0YTztXI3XeJXlFhSo4wNb0VBVqSgT+hi/CjNWKvMnY= +github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39/go.mod h1:OzYUFhPuL2JbjwFwrv6CZs23uBawekc6OZs+g19F0mY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/garyburd/redigo v1.6.4 h1:LFu2R3+ZOPgSMWMOL+saa/zXRjw0ID2G8FepO53BGlg= +github.com/garyburd/redigo v1.6.4/go.mod h1:rTb6epsqigu3kYKBnaF028A7Tf/Aw5s0cqA47doKKqw= +github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s= +github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw= +github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7qkZ3nf2NPqy4BMzlCmnQzIEbI1vuqKb2FkQ= +github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/manager_test.go b/manager_test.go index c90550e..38f1863 100644 --- a/manager_test.go +++ b/manager_test.go @@ -7,7 +7,7 @@ import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) type customMid struct { @@ -37,6 +37,7 @@ func (m *customMid) Trace() []string { } func ManagerSpec(c gospec.Context) { + const queueName = "queue-manager" processed := make(chan *Args) testJob := (func(message *Msg) { @@ -48,28 +49,28 @@ func ManagerSpec(c gospec.Context) { c.Specify("newManager", func() { c.Specify("sets queue with namespace", func() { - manager := newManager("myqueue", testJob, 10) - c.Expect(manager.queue, Equals, "prod:queue:myqueue") + manager := newManager(queueName, testJob, 10) + c.Expect(manager.queue, Equals, fmt.Sprintf("prod:queue:%s", queueName)) }) c.Specify("sets job function", func() { - manager := newManager("myqueue", testJob, 10) - c.Expect(fmt.Sprint(manager.job), Equals, fmt.Sprint(testJob)) + manager := newManager(queueName, testJob, 10) + c.Expect(fmt.Sprintf("%p", manager.job), Equals, fmt.Sprintf("%p", testJob)) }) c.Specify("sets worker concurrency", func() { - manager := newManager("myqueue", testJob, 10) + manager := newManager(queueName, testJob, 10) c.Expect(manager.concurrency, Equals, 10) }) c.Specify("no per-manager middleware means 'use global Middleware object'", func() { - manager := newManager("myqueue", testJob, 10) + manager := newManager(queueName, testJob, 10) c.Expect(manager.mids, Equals, Middleware) }) c.Specify("per-manager middlewares create separate middleware chains", func() { mid1 := customMid{Base: "0"} - manager := newManager("myqueue", testJob, 10, &mid1) + manager := newManager(queueName, testJob, 10, &mid1) c.Expect(manager.mids, Not(Equals), Middleware) c.Expect(len(manager.mids.actions), Equals, len(Middleware.actions)+1) }) diff --git a/middleware_retry_test.go b/middleware_retry_test.go index 8564e86..5970937 100644 --- a/middleware_retry_test.go +++ b/middleware_retry_test.go @@ -5,10 +5,12 @@ import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) func MiddlewareRetrySpec(c gospec.Context) { + const queueName = "queue-middleware_retry" + var panicingJob = (func(message *Msg) { panic("AHHHH") }) @@ -18,7 +20,7 @@ func MiddlewareRetrySpec(c gospec.Context) { ) layout := "2006-01-02 15:04:05 MST" - manager := newManager("myqueue", panicingJob, 1) + manager := newManager(queueName, panicingJob, 1) worker := newWorker(manager) was := Config.Namespace @@ -27,7 +29,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("puts messages in retry queue when they fail", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -41,7 +43,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("allows disabling retries", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":false}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -55,7 +57,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("doesn't retry by default", func() { message, _ := NewMsg("{\"jid\":\"2\"}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -69,7 +71,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("allows numeric retries", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":5}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -83,7 +85,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("handles new failed message", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -100,7 +102,7 @@ func MiddlewareRetrySpec(c gospec.Context) { error_backtrace, _ := message.Get("error_backtrace").String() failed_at, _ := message.Get("failed_at").String() - c.Expect(queue, Equals, "myqueue") + c.Expect(queue, Equals, queueName) c.Expect(error_message, Equals, "AHHHH") c.Expect(error_class, Equals, "") c.Expect(retry_count, Equals, 0) @@ -111,7 +113,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("handles recurring failed message", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"queue\":\"default\",\"error_message\":\"bam\",\"failed_at\":\"2013-07-20 14:03:42 UTC\",\"retry_count\":10}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -127,7 +129,7 @@ func MiddlewareRetrySpec(c gospec.Context) { failed_at, _ := message.Get("failed_at").String() retried_at, _ := message.Get("retried_at").String() - c.Expect(queue, Equals, "myqueue") + c.Expect(queue, Equals, queueName) c.Expect(error_message, Equals, "AHHHH") c.Expect(retry_count, Equals, 11) c.Expect(failed_at, Equals, "2013-07-20 14:03:42 UTC") @@ -137,7 +139,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("handles recurring failed message with customized max", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"queue\":\"default\",\"error_message\":\"bam\",\"failed_at\":\"2013-07-20 14:03:42 UTC\",\"retry_count\":8,\"retry_max\":10}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -153,7 +155,7 @@ func MiddlewareRetrySpec(c gospec.Context) { failed_at, _ := message.Get("failed_at").String() retried_at, _ := message.Get("retried_at").String() - c.Expect(queue, Equals, "myqueue") + c.Expect(queue, Equals, queueName) c.Expect(error_message, Equals, "AHHHH") c.Expect(retry_count, Equals, 9) c.Expect(failed_at, Equals, "2013-07-20 14:03:42 UTC") @@ -163,7 +165,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("doesn't retry after default number of retries", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_count\":25}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -177,7 +179,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("doesn't retry after customized number of retries", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_max\":3,\"retry_count\":3}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -191,7 +193,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("use retry_options when provided - min_delay", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":2,\"min_delay\":1200,\"max_rand\":0}}") var now int - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) now = int(time.Now().Unix()) }) @@ -212,7 +214,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("use retry_options when provided - exp", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_count\":2,\"retry_options\":{\"exp\":2,\"min_delay\":0,\"max_rand\":0}}") var now int - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) now = int(time.Now().Unix()) }) @@ -233,7 +235,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("use retry_options when provided - max_delay", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":20,\"min_delay\":600,\"max_delay\":10,\"max_rand\":10000}}") var now int - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) now = int(time.Now().Unix()) }) @@ -254,7 +256,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("use retry_options when provided - max_rand", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":2,\"min_delay\":0,\"max_rand\":100}}") var now int - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) now = int(time.Now().Unix()) }) diff --git a/middleware_stats_test.go b/middleware_stats_test.go index 46b53a8..2de2962 100644 --- a/middleware_stats_test.go +++ b/middleware_stats_test.go @@ -3,17 +3,19 @@ package workers import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "time" ) func MiddlewareStatsSpec(c gospec.Context) { + const queueName = "queue-middleware_stats" + var job = (func(message *Msg) { // noop }) layout := "2006-01-02" - manager := newManager("myqueue", job, 1) + manager := newManager(queueName, job, 1) worker := newWorker(manager) message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true}") @@ -44,7 +46,7 @@ func MiddlewareStatsSpec(c gospec.Context) { panic("AHHHH") }) - manager := newManager("myqueue", job, 1) + manager := newManager(queueName, job, 1) worker := newWorker(manager) c.Specify("increments failed stats", func() { diff --git a/scheduled.go b/scheduled.go index a7b08fa..91bda94 100644 --- a/scheduled.go +++ b/scheduled.go @@ -4,7 +4,7 @@ import ( "strings" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) type scheduled struct { @@ -24,7 +24,7 @@ func (s *scheduled) start() { s.poll() - time.Sleep(time.Duration(Config.PollInterval) * time.Second) + time.Sleep(time.Duration(Config.PoolInterval) * time.Second) } })() } diff --git a/scheduled_test.go b/scheduled_test.go index d292a77..af591f4 100644 --- a/scheduled_test.go +++ b/scheduled_test.go @@ -3,7 +3,7 @@ package workers import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) func ScheduledSpec(c gospec.Context) { diff --git a/worker_test.go b/worker_test.go index 1097070..d9e7b41 100644 --- a/worker_test.go +++ b/worker_test.go @@ -37,13 +37,15 @@ func confirm(manager *manager) (msg *Msg) { } func WorkerSpec(c gospec.Context) { + const queueName = "queue-worker_test" + var processed = make(chan *Args) var testJob = (func(message *Msg) { processed <- message.Args() }) - manager := newManager("myqueue", testJob, 1) + manager := newManager(queueName, testJob, 1) c.Specify("newWorker", func() { c.Specify("it returns an instance of worker with connection to manager", func() { @@ -125,7 +127,7 @@ func WorkerSpec(c gospec.Context) { panic("AHHHHHHHHH") }) - manager := newManager("myqueue", panicJob, 1) + manager := newManager(queueName, panicJob, 1) worker := newWorker(manager) go worker.work(messages) diff --git a/workers_test.go b/workers_test.go index 4bcd8b6..a8d74f2 100644 --- a/workers_test.go +++ b/workers_test.go @@ -9,20 +9,25 @@ import ( var called chan bool -func myJob(message *Msg) { +func myJob(_ *Msg) { called <- true } func WorkersSpec(c gospec.Context) { + const queueName = "queue-workers" + c.Specify("Workers", func() { c.Specify("allows running in tests", func() { called = make(chan bool) - Process("myqueue", myJob, 10) + Process(queueName, myJob, 10) Start() - Enqueue("myqueue", "Add", []int{1, 2}) + _, err := Enqueue(queueName, "Add", []int{1, 2}) + if err != nil { + panic(err) + } <-called Quit() @@ -32,14 +37,14 @@ func WorkersSpec(c gospec.Context) { //c.Specify("allows starting and stopping multiple times", func() { // called = make(chan bool) - // Process("myqueue", myJob, 10) + // Process(queueName, myJob, 10) // Start() // Quit() // Start() - // Enqueue("myqueue", "Add", []int{1, 2}) + // Enqueue(queueName, "Add", []int{1, 2}) // <-called // Quit() From 59bf90808a1b206d95b101bea9efce144481e498 Mon Sep 17 00:00:00 2001 From: Tobias Ulrich Date: Mon, 1 Apr 2024 10:48:20 -0300 Subject: [PATCH 11/11] Fix go.mod module name (#5) --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 4124dd9..15c60c2 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module go-workers +module github.com/topfreegames/go-workers go 1.19