diff --git a/README.md b/README.md index 0fffa36..fc64226 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,18 @@ func main() { // Add a job to a queue with retry workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true}) + // Add a job to a queue in a different redis instance + workers.EnqueueWithOptions("myqueue4", "Add", []int{1, 2}, + workers.EnqueueOptions{ + Retry: true, + ConnectionOptions: map[string]string{ + "server": "localhost:6378", + "database": "my-database", + "pool": "10", + "password": "pass", + }}, + ) + // stats will be available at http://localhost:8080/stats go workers.StatsServer(8080) diff --git a/config.go b/config.go index d4dfc9a..2abc85e 100644 --- a/config.go +++ b/config.go @@ -25,7 +25,6 @@ type config struct { var Config *config func Configure(options map[string]string) { - var poolSize int var namespace string var pollInterval int var retryKey string @@ -55,7 +54,6 @@ func Configure(options map[string]string) { } scheduleKey = defaultScheduleJobsKey - poolSize, _ = strconv.Atoi(options["pool"]) Config = &config{ options["process"], @@ -63,34 +61,40 @@ func Configure(options map[string]string) { pollInterval, retryKey, scheduleKey, - &redis.Pool{ - MaxIdle: poolSize, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", options["server"]) - if err != nil { + GetConnectionPool(options), + DefaultFetch, + } +} + +func GetConnectionPool(options map[string]string) *redis.Pool { + poolSize, _ := strconv.Atoi(options["pool"]) + + return &redis.Pool{ + MaxIdle: poolSize, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", options["server"]) + if err != nil { + return nil, err + } + if options["password"] != "" { + if _, err := c.Do("AUTH", options["password"]); err != nil { + c.Close() return nil, err } - if options["password"] != "" { - if _, err := c.Do("AUTH", options["password"]); err != nil { - c.Close() - return nil, err - } - } - if options["database"] != "" { - if _, err := c.Do("SELECT", options["database"]); err != nil { - c.Close() - return nil, err - } + } + if options["database"] != "" { + if _, err := c.Do("SELECT", options["database"]); err != nil { + c.Close() + return nil, err } - return c, err - }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - _, err := c.Do("PING") - return err - }, + } + return c, err + }, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err }, - DefaultFetch, } } diff --git a/enqueue.go b/enqueue.go index 2e5a088..fffea92 100644 --- a/enqueue.go +++ b/enqueue.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "time" + + "github.com/gomodule/redigo/redis" ) const ( @@ -44,9 +46,19 @@ func (e EnqueueData) MarshalJSON() ([]byte, error) { } type EnqueueOptions struct { - RetryCount int `json:"retry_count,omitempty"` - Retry bool `json:"retry,omitempty"` - At float64 `json:"at,omitempty"` + RetryCount int `json:"retry_count,omitempty"` + Retry bool `json:"retry,omitempty"` + RetryMax int `json:"retry_max,omitempty"` + At float64 `json:"at,omitempty"` + RetryOptions RetryOptions `json:"retry_options,omitempty"` + ConnectionOptions map[string]string `json:"connection_options,omitempty"` +} + +type RetryOptions struct { + Exp int `json:"exp"` + MinDelay int `json:"min_delay"` + MaxDelay int `json:"max_delay"` + MaxRand int `json:"max_rand"` } func generateJid() string { @@ -92,7 +104,12 @@ func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptio return data.Jid, err } - conn := Config.Pool.Get() + var conn redis.Conn + if len(opts.ConnectionOptions) == 0 { + conn = Config.Pool.Get() + } else { + conn = GetConnectionPool(opts.ConnectionOptions).Get() + } defer conn.Close() _, err = conn.Do("sadd", Config.Namespace+"queues", queue) @@ -100,7 +117,7 @@ func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptio return "", err } queue = Config.Namespace + "queue:" + queue - _, err = conn.Do("rpush", queue, bytes) + _, err = conn.Do("lpush", queue, bytes) if err != nil { return "", err } diff --git a/enqueue_test.go b/enqueue_test.go index c6d06e2..5f565bd 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -72,19 +72,19 @@ func EnqueueSpec(c gospec.Context) { c.Expect(ea, IsWithin(0.1), nowToSecondsWithNanoPrecision()) }) - c.Specify("sets retry count to `retry`", func() { - EnqueueWithOptions("enqueue6", "Compare", []string{"foo", "bar"}, EnqueueOptions{RetryCount: 13}) + c.Specify("has retry and retry_max when set", func() { + EnqueueWithOptions("enqueue6", "Compare", []string{"foo", "bar"}, EnqueueOptions{RetryMax: 13, Retry: true}) bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue6")) var result map[string]interface{} json.Unmarshal(bytes, &result) c.Expect(result["class"], Equals, "Compare") - retry := result["retry"].(float64) - c.Expect(retry, Equals, float64(13)) + retry := result["retry"].(bool) + c.Expect(retry, Equals, true) - retryCount := result["retry_count"].(float64) - c.Expect(retryCount, Equals, float64(0)) + retryMax := int(result["retry_max"].(float64)) + c.Expect(retryMax, Equals, 13) }) c.Specify("sets Retry correctly when no count given", func() { @@ -98,6 +98,33 @@ func EnqueueSpec(c gospec.Context) { retry := result["retry"].(bool) c.Expect(retry, Equals, true) }) + + c.Specify("has retry_options when set", func() { + EnqueueWithOptions( + "enqueue7", "Compare", []string{"foo", "bar"}, + EnqueueOptions{ + RetryMax: 13, + Retry: true, + RetryOptions: RetryOptions{ + Exp: 2, + MinDelay: 0, + MaxDelay: 60, + MaxRand: 30, + }, + }) + + bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue7")) + var result map[string]interface{} + json.Unmarshal(bytes, &result) + c.Expect(result["class"], Equals, "Compare") + + retryOptions := result["retry_options"].(map[string]interface{}) + c.Expect(len(retryOptions), Equals, 4) + c.Expect(retryOptions["exp"].(float64), Equals, float64(2)) + c.Expect(retryOptions["min_delay"].(float64), Equals, float64(0)) + c.Expect(retryOptions["max_delay"].(float64), Equals, float64(60)) + c.Expect(retryOptions["max_rand"].(float64), Equals, float64(30)) + }) }) c.Specify("EnqueueIn", func() { diff --git a/fetcher.go b/fetcher.go index 18f4c6d..62064d6 100644 --- a/fetcher.go +++ b/fetcher.go @@ -88,7 +88,7 @@ func (f *fetch) tryFetchMessage() { if err != nil { // If redis returns null, the queue is empty. Just ignore the error. if err.Error() != "redigo: nil returned" { - Logger.Println("ERR: ", err) + Logger.Errorln("failed to fetch message", err) time.Sleep(1 * time.Second) } } else { @@ -100,7 +100,7 @@ func (f *fetch) sendMessage(message string) { msg, err := NewMsg(message) if err != nil { - Logger.Println("ERR: Couldn't create message from", message, ":", err) + Logger.Errorln("failed to create message from", message, ":", err) return } @@ -145,7 +145,7 @@ func (f *fetch) inprogressMessages() []string { messages, err := redis.Strings(conn.Do("lrange", f.inprogressQueue(), 0, -1)) if err != nil { - Logger.Println("ERR: ", err) + Logger.Errorln("failed to fetch messages in progress", err) } return messages diff --git a/go.mod b/go.mod index 782bd86..d892251 100644 --- a/go.mod +++ b/go.mod @@ -9,4 +9,5 @@ require ( github.com/gomodule/redigo v1.8.2 github.com/kr/pretty v0.2.1 // indirect github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect + github.com/sirupsen/logrus v1.6.0 ) diff --git a/go.sum b/go.sum index d3ed65a..89273bc 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,12 @@ github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 h1:O0YTztXI3XeJX github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39/go.mod h1:OzYUFhPuL2JbjwFwrv6CZs23uBawekc6OZs+g19F0mY= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= +github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8= +github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -17,9 +21,14 @@ github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7q github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= +github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= diff --git a/manager.go b/manager.go index 744c416..11ebbb8 100644 --- a/manager.go +++ b/manager.go @@ -32,7 +32,7 @@ func (m *manager) prepare() { } func (m *manager) quit() { - Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).") + Logger.Infoln("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).") m.prepare() m.workersM.Lock() @@ -50,7 +50,7 @@ func (m *manager) quit() { } func (m *manager) manage() { - Logger.Println("processing queue", m.queueName(), "with", m.concurrency, "workers.") + Logger.Infoln("processing queue", m.queueName(), "with", m.concurrency, "workers.") go m.fetch.Fetch() @@ -77,7 +77,7 @@ func (m *manager) loadWorkers() { func (m *manager) processing() (count int) { m.workersM.Lock() for _, worker := range m.workers { - if worker.processing() { + if worker != nil && worker.processing() { count++ } } diff --git a/middleware_logging.go b/middleware_logging.go index 11eaa13..a4aa062 100644 --- a/middleware_logging.go +++ b/middleware_logging.go @@ -12,16 +12,16 @@ func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() bool) ( prefix := fmt.Sprint(queue, " JID-", message.Jid()) start := time.Now() - Logger.Println(prefix, "start") - Logger.Println(prefix, "args:", message.Args().ToJson()) + Logger.Debug(prefix, "start") + Logger.Debug(prefix, "args:", message.Args().ToJson()) defer func() { if e := recover(); e != nil { - Logger.Println(prefix, "fail:", time.Since(start)) + Logger.Debug(prefix, "fail:", time.Since(start)) buf := make([]byte, 4096) buf = buf[:runtime.Stack(buf, false)] - Logger.Printf("%s error: %v\n%s", prefix, e, buf) + Logger.Errorf("%s error: %v\n%s", prefix, e, buf) panic(e) } @@ -29,7 +29,7 @@ func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() bool) ( acknowledge = next() - Logger.Println(prefix, "done:", time.Since(start)) + Logger.Debug(prefix, "done:", time.Since(start)) return } diff --git a/middleware_retry.go b/middleware_retry.go index 4de2cb0..25a6a5f 100644 --- a/middleware_retry.go +++ b/middleware_retry.go @@ -1,6 +1,7 @@ package workers import ( + "encoding/json" "fmt" "math" "math/rand" @@ -25,9 +26,10 @@ func (r *MiddlewareRetry) Call(queue string, message *Msg, next func() bool) (ac message.Set("error_message", fmt.Sprintf("%v", e)) retryCount := incrementRetry(message) + retryOptions, _ := message.Get("retry_options").Map() waitDuration := durationToSecondsWithNanoPrecision( time.Duration( - secondsToDelay(retryCount), + secondsToDelay(retryCount, retryOptions), ) * time.Second, ) @@ -61,11 +63,15 @@ func retry(message *Msg) bool { if param, err := message.Get("retry").Bool(); err == nil { retry = param - } else if param, err := message.Get("retry").Int(); err == nil { + } else if param, err := message.Get("retry").Int(); err == nil { // compatible with sidekiq max = param retry = true } + if param, err := message.Get("retry_max").Int(); err == nil { + max = param + } + count, _ := message.Get("retry_count").Int() return retry && count < max @@ -86,7 +92,38 @@ func incrementRetry(message *Msg) (retryCount int) { return } -func secondsToDelay(count int) int { - power := math.Pow(float64(count), 4) - return int(power) + 15 + (rand.Intn(30) * (count + 1)) +func secondsToDelay(count int, retryOptions map[string]interface{}) int { + exp := float64(4) + minDelay := float64(15) + maxDelay := math.Inf(1) + maxRand := float64(30) + if retryOptions != nil { + if v, ok := retryOptions["exp"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + exp = v2 + } + } + if v, ok := retryOptions["min_delay"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + minDelay = v2 + } + } + if v, ok := retryOptions["max_delay"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + maxDelay = v2 + } + } + if v, ok := retryOptions["max_rand"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + maxRand = v2 + } + } + } + + power := math.Pow(float64(count), exp) + randN := 0 + if maxRand > 0 { + randN = rand.Intn(int(maxRand)) + } + return int(math.Min(power+minDelay+float64(randN*(count+1)), maxDelay)) } diff --git a/middleware_retry_test.go b/middleware_retry_test.go index 01ecffd..d60c2cf 100644 --- a/middleware_retry_test.go +++ b/middleware_retry_test.go @@ -135,7 +135,7 @@ func MiddlewareRetrySpec(c gospec.Context) { }) c.Specify("handles recurring failed message with customized max", func() { - message, _ := NewMsg("{\"jid\":\"2\",\"retry\":10,\"queue\":\"default\",\"error_message\":\"bam\",\"failed_at\":\"2013-07-20 14:03:42 UTC\",\"retry_count\":8}") + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"queue\":\"default\",\"error_message\":\"bam\",\"failed_at\":\"2013-07-20 14:03:42 UTC\",\"retry_count\":8,\"retry_max\":10}") wares.call("myqueue", message, func() { worker.process(message) @@ -175,7 +175,7 @@ func MiddlewareRetrySpec(c gospec.Context) { }) c.Specify("doesn't retry after customized number of retries", func() { - message, _ := NewMsg("{\"jid\":\"2\",\"retry\":3,\"retry_count\":3}") + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_max\":3,\"retry_count\":3}") wares.call("myqueue", message, func() { worker.process(message) @@ -188,5 +188,90 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Expect(count, Equals, 0) }) + c.Specify("use retry_options when provided - min_delay", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":2,\"min_delay\":1200,\"max_rand\":0}}") + var now int + wares.call("myqueue", message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(int(nextAt), Equals, now+1200) + }) + + c.Specify("use retry_options when provided - exp", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_count\":2,\"retry_options\":{\"exp\":2,\"min_delay\":0,\"max_rand\":0}}") + var now int + wares.call("myqueue", message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(int(nextAt), Equals, now+9) + }) + + c.Specify("use retry_options when provided - max_delay", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":20,\"min_delay\":600,\"max_delay\":10,\"max_rand\":10000}}") + var now int + wares.call("myqueue", message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(int(nextAt), Equals, now+10) + }) + + c.Specify("use retry_options when provided - max_rand", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":2,\"min_delay\":0,\"max_rand\":100}}") + var now int + wares.call("myqueue", message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(nextAt, Satisfies, nextAt <= float64(now+100)) + c.Expect(nextAt, Satisfies, nextAt >= float64(now+0)) + }) + Config.Namespace = was } diff --git a/middleware_stats.go b/middleware_stats.go index 62a7663..c3bcc63 100644 --- a/middleware_stats.go +++ b/middleware_stats.go @@ -32,6 +32,6 @@ func incrementStats(metric string) { conn.Send("incr", Config.Namespace+"stat:"+metric+":"+today) if _, err := conn.Do("exec"); err != nil { - Logger.Println("couldn't save stats:", err) + Logger.Errorln("failed to save stats:", err) } } diff --git a/msg.go b/msg.go index f922fa0..2a47c36 100644 --- a/msg.go +++ b/msg.go @@ -1,8 +1,9 @@ package workers import ( - "github.com/bitly/go-simplejson" "reflect" + + "github.com/bitly/go-simplejson" ) type data struct { @@ -39,7 +40,7 @@ func (d *data) ToJson() string { json, err := d.Encode() if err != nil { - Logger.Println("ERR: Couldn't generate json from", d, ":", err) + Logger.Errorln("failed to generate json from", d, ":", err) } return string(json) diff --git a/stats.go b/stats.go index be542ce..998cd69 100644 --- a/stats.go +++ b/stats.go @@ -15,10 +15,42 @@ type stats struct { Retries int64 `json:"retries"` } +// Stats writes stats on response writer func Stats(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Access-Control-Allow-Origin", "*") + stats := getStats() + + body, _ := json.MarshalIndent(stats, "", " ") + fmt.Fprintln(w, string(body)) +} + +// WorkerStats holds workers stats +type WorkerStats struct { + Processed int `json:"processed"` + Failed int `json:"failed"` + Enqueued map[string]string `json:"enqueued"` + Retries int64 `json:"retries"` +} + +// GetStats returns workers stats +func GetStats() *WorkerStats { + _stats := getStats() + enqueued := map[string]string{} + if statsEnqueued, ok := _stats.Enqueued.(map[string]string); ok { + enqueued = statsEnqueued + } + + return &WorkerStats{ + Processed: _stats.Processed, + Failed: _stats.Failed, + Retries: _stats.Retries, + Enqueued: enqueued, + } +} + +func getStats() stats { jobs := make(map[string][]*map[string]interface{}) enqueued := make(map[string]string) @@ -39,7 +71,7 @@ func Stats(w http.ResponseWriter, req *http.Request) { } } - stats := stats{ + _stats := stats{ 0, 0, jobs, @@ -53,37 +85,37 @@ func Stats(w http.ResponseWriter, req *http.Request) { conn.Send("multi") conn.Send("get", Config.Namespace+"stat:processed") conn.Send("get", Config.Namespace+"stat:failed") - conn.Send("zcard", Config.Namespace+Config.RetryKey) + conn.Send("zcard", Config.Namespace+RETRY_KEY) - for key, _ := range enqueued { + for key := range enqueued { conn.Send("llen", fmt.Sprintf("%squeue:%s", Config.Namespace, key)) } r, err := conn.Do("exec") if err != nil { - Logger.Println("couldn't retrieve stats:", err) + Logger.Errorln("failed to retrieve stats:", err) } results := r.([]interface{}) if len(results) == (3 + len(enqueued)) { for index, result := range results { if index == 0 && result != nil { - stats.Processed, _ = strconv.Atoi(string(result.([]byte))) + _stats.Processed, _ = strconv.Atoi(string(result.([]byte))) continue } if index == 1 && result != nil { - stats.Failed, _ = strconv.Atoi(string(result.([]byte))) + _stats.Failed, _ = strconv.Atoi(string(result.([]byte))) continue } if index == 2 && result != nil { - stats.Retries = result.(int64) + _stats.Retries = result.(int64) continue } queueIndex := 0 - for key, _ := range enqueued { + for key := range enqueued { if queueIndex == (index - 3) { enqueued[key] = fmt.Sprintf("%d", result.(int64)) } @@ -92,6 +124,5 @@ func Stats(w http.ResponseWriter, req *http.Request) { } } - body, _ := json.MarshalIndent(stats, "", " ") - fmt.Fprintln(w, string(body)) + return _stats } diff --git a/workers.go b/workers.go index 4d12777..a6622fb 100644 --- a/workers.go +++ b/workers.go @@ -3,9 +3,7 @@ package workers import ( "errors" "fmt" - "log" "net/http" - "os" "sync" ) @@ -14,8 +12,6 @@ const ( SCHEDULED_JOBS_KEY = "schedule" ) -var Logger WorkersLogger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds) - var managers = make(map[string]*manager) var schedule *scheduled var control = make(map[string]chan string) @@ -88,10 +84,10 @@ func Quit() { func StatsServer(port int) { http.HandleFunc("/stats", Stats) - Logger.Println("Stats are available at", fmt.Sprint("http://localhost:", port, "/stats")) + Logger.Infoln("Stats are available at", fmt.Sprint("http://localhost:", port, "/stats")) if err := http.ListenAndServe(fmt.Sprint(":", port), nil); err != nil { - Logger.Println(err) + Logger.Error("failed to start stats server", err) } } diff --git a/workers_logger.go b/workers_logger.go index cab91b1..4ac7005 100644 --- a/workers_logger.go +++ b/workers_logger.go @@ -1,6 +1,43 @@ package workers +import "github.com/sirupsen/logrus" + +// WorkersLogger represents the log interface type WorkersLogger interface { - Println(...interface{}) - Printf(string, ...interface{}) + Fatal(format ...interface{}) + Fatalf(format string, args ...interface{}) + Fatalln(args ...interface{}) + + Debug(args ...interface{}) + Debugf(format string, args ...interface{}) + Debugln(args ...interface{}) + + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Errorln(args ...interface{}) + + Info(args ...interface{}) + Infof(format string, args ...interface{}) + Infoln(args ...interface{}) + + Warn(args ...interface{}) + Warnf(format string, args ...interface{}) + Warnln(args ...interface{}) +} + +// Logger is the default logger +var Logger = initLogger() + +func initLogger() WorkersLogger { + plog := logrus.New() + plog.Formatter = new(logrus.TextFormatter) + plog.Level = logrus.InfoLevel + return plog.WithFields(logrus.Fields{"source": "go-workers"}) +} + +// SetLogger rewrites the default logger +func SetLogger(l WorkersLogger) { + if l != nil { + Logger = l + } }