diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cb756e9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +vendor +.idea \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 141ea53..203b4d4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: go go: - - 1.7 + - "1.19" script: - go get github.com/customerio/gospec diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..4f239fc --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ + + +export COMPOSE_PROJECT_NAME := go-workers + +.PHONY: deps/start +deps/start: + echo "${COMPOSE_PROJECT_NAME}" + docker-compose -p '${COMPOSE_PROJECT_NAME}' up -d + +.PHONY: deps/stop +deps/stop: + docker-compose -p '${COMPOSE_PROJECT_NAME}' down + +test: + @make deps/start + go get github.com/customerio/gospec + go test -v + @make deps/stop diff --git a/README.md b/README.md index 0fffa36..0a070d9 100644 --- a/README.md +++ b/README.md @@ -18,55 +18,69 @@ Example usage: package main import ( - "github.com/jrallison/go-workers" + "github.com/topfreegames/go-workers" + workers "go-workers" ) func myJob(message *workers.Msg) { - // do something with your message - // message.Jid() - // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson) + // do something with your message + // message.Jid() + // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson) } type myMiddleware struct{} func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) { - // do something before each message is processed - acknowledge = next() - // do something after each message is processed - return -} + // do something before each message is processed + acknowledge = next() + // do something after each message is processed + return +} func main() { - workers.Configure(map[string]string{ - // location of redis instance - "server": "localhost:6379", - // instance of the database - "database": "0", - // number of connections to keep open with redis - "pool": "30", - // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash) - "process": "1", - }) - - workers.Middleware.Append(&myMiddleware{}) - - // pull messages from "myqueue" with concurrency of 10 - workers.Process("myqueue", myJob, 10) - - // pull messages from "myqueue2" with concurrency of 20 - workers.Process("myqueue2", myJob, 20) - - // Add a job to a queue - workers.Enqueue("myqueue3", "Add", []int{1, 2}) - - // Add a job to a queue with retry - workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true}) - - // stats will be available at http://localhost:8080/stats - go workers.StatsServer(8080) - - // Blocks until process is told to exit via unix signal - workers.Run() + workers.Configure(workers.Options{ + // location of redis instance + Address: "localhost:6379", + // instance of the database + Database: "0", + // number of connections to keep open with redis + PoolSize: "30", + // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash) + ProcessID: "1", + }) + + workers.Middleware.Append(&myMiddleware{}) + + // pull messages from "myqueue" with concurrency of 10 + workers.Process("myqueue", myJob, 10) + + // pull messages from "myqueue2" with concurrency of 20 + workers.Process("myqueue2", myJob, 20) + + // Add a job to a queue + workers.Enqueue("myqueue3", "Add", []int{1, 2}) + + // Add a job to a queue with retry + workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true}) + + // Add a job to a queue in a different redis instance + workers.EnqueueWithOptions("myqueue4", "Add", []int{1, 2}, + workers.EnqueueOptions{ + Retry: true, + ConnectionOptions: workers.Options{ + Address: "localhost:6378", + Database: "my-database", + PoolSize: 10, + Password: "pass", + }, + }, + ) + + // stats will be available at http://localhost:8080/stats + go workers.StatsServer(8080) + + // Blocks until process is told to exit via unix signal + workers.Run() } ``` diff --git a/all_specs_test.go b/all_specs_test.go index 699c44a..be1bee6 100644 --- a/all_specs_test.go +++ b/all_specs_test.go @@ -18,16 +18,22 @@ func TestAllSpecs(t *testing.T) { r.Parallel = false r.BeforeEach = func() { - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", - "database": "15", - "pool": "1", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", + Database: "15", + PoolSize: 1, }) conn := Config.Pool.Get() - conn.Do("flushdb") - conn.Close() + _, err := conn.Do("flushdb") + if err != nil { + panic("failed to flush db: " + err.Error()) + } + err = conn.Close() + if err != nil { + panic("failed close connection: " + err.Error()) + } } // List all specs here diff --git a/config.go b/config.go index f93ac94..850ad45 100644 --- a/config.go +++ b/config.go @@ -1,80 +1,92 @@ package workers import ( - "strconv" + "fmt" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) -type config struct { +type Options struct { + Address string + Password string + Database string + ProcessID string + Namespace string + PoolSize int + PoolInterval int + DialOptions []redis.DialOption +} + +type WorkerConfig struct { processId string Namespace string - PollInterval int + PoolInterval int Pool *redis.Pool Fetch func(queue string) Fetcher } -var Config *config +var Config *WorkerConfig -func Configure(options map[string]string) { - var poolSize int +func Configure(options Options) { var namespace string - var pollInterval int - if options["server"] == "" { - panic("Configure requires a 'server' option, which identifies a Redis instance") + if options.Address == "" { + panic("Configure requires a 'Address' option, which identifies a Redis instance") } - if options["process"] == "" { - panic("Configure requires a 'process' option, which uniquely identifies this instance") + if options.ProcessID == "" { + panic("Configure requires a 'ProcessID' option, which uniquely identifies this instance") } - if options["pool"] == "" { - options["pool"] = "1" + if options.PoolSize <= 0 { + options.PoolSize = 1 } - if options["namespace"] != "" { - namespace = options["namespace"] + ":" + if options.Namespace != "" { + namespace = options.Namespace + ":" } - if seconds, err := strconv.Atoi(options["poll_interval"]); err == nil { - pollInterval = seconds - } else { - pollInterval = 15 + if options.PoolInterval <= 0 { + options.PoolInterval = 15 } - poolSize, _ = strconv.Atoi(options["pool"]) - - Config = &config{ - options["process"], + Config = &WorkerConfig{ + options.ProcessID, namespace, - pollInterval, - &redis.Pool{ - MaxIdle: poolSize, - IdleTimeout: 240 * time.Second, - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", options["server"]) - if err != nil { - return nil, err - } - if options["password"] != "" { - if _, err := c.Do("AUTH", options["password"]); err != nil { - c.Close() - return nil, err + options.PoolInterval, + GetConnectionPool(options), + func(queue string) Fetcher { + return NewFetch(queue, make(chan *Msg), make(chan bool)) + }, + } +} + +func GetConnectionPool(options Options) *redis.Pool { + return &redis.Pool{ + MaxIdle: options.PoolSize, + IdleTimeout: 240 * time.Second, + Dial: func() (redis.Conn, error) { + c, err := redis.Dial("tcp", options.Address, options.DialOptions...) + if err != nil { + return nil, err + } + if options.Password != "" { + if _, err := c.Do("AUTH", options.Password); err != nil { + if errClose := c.Close(); errClose != nil { + return nil, fmt.Errorf("%w. failed to close connection: %s", err, errClose.Error()) } + return nil, err } - if options["database"] != "" { - if _, err := c.Do("SELECT", options["database"]); err != nil { - c.Close() - return nil, err + } + if options.Database != "" { + if _, err := c.Do("SELECT", options.Database); err != nil { + if errClose := c.Close(); errClose != nil { + return nil, fmt.Errorf("%w. failed to close connection: %s", err, errClose.Error()) } } - return c, err - }, - TestOnBorrow: func(c redis.Conn, t time.Time) error { - _, err := c.Do("PING") - return err - }, + } + return c, err }, - func(queue string) Fetcher { - return NewFetch(queue, make(chan *Msg), make(chan bool)) + TestOnBorrow: func(c redis.Conn, t time.Time) error { + _, err := c.Do("PING") + return err }, } } diff --git a/config_test.go b/config_test.go index afde76d..1033d13 100644 --- a/config_test.go +++ b/config_test.go @@ -21,10 +21,10 @@ func ConfigSpec(c gospec.Context) { c.Specify("sets redis pool size which defaults to 1", func() { c.Expect(Config.Pool.MaxIdle, Equals, 1) - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", - "pool": "20", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", + PoolSize: 20, }) c.Expect(Config.Pool.MaxIdle, Equals, 20) @@ -33,9 +33,9 @@ func ConfigSpec(c gospec.Context) { c.Specify("can specify custom process", func() { c.Expect(Config.processId, Equals, "1") - Configure(map[string]string{ - "server": "localhost:6379", - "process": "2", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "2", }) c.Expect(Config.processId, Equals, "2") @@ -43,48 +43,48 @@ func ConfigSpec(c gospec.Context) { c.Specify("requires a server parameter", func() { err := recoverOnPanic(func() { - Configure(map[string]string{"process": "2"}) + Configure(Options{ProcessID: "2"}) }) - c.Expect(err, Equals, "Configure requires a 'server' option, which identifies a Redis instance") + c.Expect(err, Equals, "Configure requires a 'Address' option, which identifies a Redis instance") }) c.Specify("requires a process parameter", func() { err := recoverOnPanic(func() { - Configure(map[string]string{"server": "localhost:6379"}) + Configure(Options{Address: "localhost:6379"}) }) - c.Expect(err, Equals, "Configure requires a 'process' option, which uniquely identifies this instance") + c.Expect(err, Equals, "Configure requires a 'ProcessID' option, which uniquely identifies this instance") }) c.Specify("adds ':' to the end of the namespace", func() { c.Expect(Config.Namespace, Equals, "") - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", - "namespace": "prod", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", + Namespace: "prod", }) c.Expect(Config.Namespace, Equals, "prod:") }) c.Specify("defaults poll interval to 15 seconds", func() { - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", }) - c.Expect(Config.PollInterval, Equals, 15) + c.Expect(Config.PoolInterval, Equals, 15) }) c.Specify("allows customization of poll interval", func() { - Configure(map[string]string{ - "server": "localhost:6379", - "process": "1", - "poll_interval": "1", + Configure(Options{ + Address: "localhost:6379", + ProcessID: "1", + PoolInterval: 1, }) - c.Expect(Config.PollInterval, Equals, 1) + c.Expect(Config.PoolInterval, Equals, 1) }) } diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..6413de0 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,19 @@ +version: '2.2' + +services: + redis: + image: redis:5-alpine + ports: + - 6379:6379 + healthcheck: + test: [ "CMD", "redis-cli", "ping" ] + interval: 3s + timeout: 3s + retries: 30 + networks: + - go-workers + +networks: + go-workers: + driver: bridge + name: go-workers \ No newline at end of file diff --git a/enqueue.go b/enqueue.go index f9af7a8..2754a60 100644 --- a/enqueue.go +++ b/enqueue.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "time" + + "github.com/gomodule/redigo/redis" ) const ( @@ -22,9 +24,19 @@ type EnqueueData struct { } type EnqueueOptions struct { - RetryCount int `json:"retry_count,omitempty"` - Retry bool `json:"retry,omitempty"` - At float64 `json:"at,omitempty"` + RetryCount int `json:"retry_count,omitempty"` + Retry bool `json:"retry,omitempty"` + RetryMax int `json:"retry_max,omitempty"` + At float64 `json:"at,omitempty"` + RetryOptions RetryOptions `json:"retry_options,omitempty"` + ConnectionOptions Options `json:"connection_options,omitempty"` +} + +type RetryOptions struct { + Exp int `json:"exp"` + MinDelay int `json:"min_delay"` + MaxDelay int `json:"max_delay"` + MaxRand int `json:"max_rand"` } func generateJid() string { @@ -70,15 +82,26 @@ func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptio return data.Jid, err } - conn := Config.Pool.Get() - defer conn.Close() + var conn redis.Conn + if opts.ConnectionOptions.Address == "" { + Logger.Debug("missing redis Address in EnqueueWithOptions. using default pool") + conn = Config.Pool.Get() + } else { + conn = GetConnectionPool(opts.ConnectionOptions).Get() + } + defer func(conn redis.Conn) { + err := conn.Close() + if err != nil { + Logger.Errorf("failed to close Redis connection in EnqueueWithOptions: %w", err) + } + }(conn) _, err = conn.Do("sadd", Config.Namespace+"queues", queue) if err != nil { return "", err } queue = Config.Namespace + "queue:" + queue - _, err = conn.Do("rpush", queue, bytes) + _, err = conn.Do("lpush", queue, bytes) if err != nil { return "", err } diff --git a/enqueue_test.go b/enqueue_test.go index e3a2707..c06cbfa 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -5,7 +5,8 @@ import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + + "github.com/gomodule/redigo/redis" ) func EnqueueSpec(c gospec.Context) { @@ -17,8 +18,12 @@ func EnqueueSpec(c gospec.Context) { defer conn.Close() c.Specify("makes the queue available", func() { - Enqueue("enqueue1", "Add", []int{1, 2}) + enqueue, err := Enqueue("enqueue1", "Add", []int{1, 2}) + if err != nil { + panic(err) + } + c.Expect(enqueue, Not(Equals), "") found, _ := redis.Bool(conn.Do("sismember", "prod:queues", "enqueue1")) c.Expect(found, IsTrue) }) @@ -72,8 +77,8 @@ func EnqueueSpec(c gospec.Context) { c.Expect(ea, IsWithin(0.1), nowToSecondsWithNanoPrecision()) }) - c.Specify("has retry and retry_count when set", func() { - EnqueueWithOptions("enqueue6", "Compare", []string{"foo", "bar"}, EnqueueOptions{RetryCount: 13, Retry: true}) + c.Specify("has retry and retry_max when set", func() { + EnqueueWithOptions("enqueue6", "Compare", []string{"foo", "bar"}, EnqueueOptions{RetryMax: 13, Retry: true}) bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue6")) var result map[string]interface{} @@ -83,8 +88,38 @@ func EnqueueSpec(c gospec.Context) { retry := result["retry"].(bool) c.Expect(retry, Equals, true) - retryCount := int(result["retry_count"].(float64)) - c.Expect(retryCount, Equals, 13) + retryMax := int(result["retry_max"].(float64)) + c.Expect(retryMax, Equals, 13) + }) + + c.Specify("has retry_options when set", func() { + EnqueueWithOptions( + "enqueue7", "Compare", []string{"foo", "bar"}, + EnqueueOptions{ + RetryMax: 13, + Retry: true, + RetryOptions: RetryOptions{ + Exp: 2, + MinDelay: 0, + MaxDelay: 60, + MaxRand: 30, + }, + }) + + bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue7")) + var result map[string]interface{} + err := json.Unmarshal(bytes, &result) + if err != nil { + panic(err) + } + c.Expect(result["class"], Equals, "Compare") + + retryOptions := result["retry_options"].(map[string]interface{}) + c.Expect(len(retryOptions), Equals, 4) + c.Expect(retryOptions["exp"].(float64), Equals, float64(2)) + c.Expect(retryOptions["min_delay"].(float64), Equals, float64(0)) + c.Expect(retryOptions["max_delay"].(float64), Equals, float64(60)) + c.Expect(retryOptions["max_rand"].(float64), Equals, float64(30)) }) }) diff --git a/fetch_test.go b/fetch_test.go index 37e3fcc..3a355f2 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -3,7 +3,7 @@ package workers import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) func buildFetch(queue string) Fetcher { diff --git a/fetcher.go b/fetcher.go index 2745a80..cbbf93b 100644 --- a/fetcher.go +++ b/fetcher.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) type Fetcher interface { @@ -88,7 +88,7 @@ func (f *fetch) tryFetchMessage() { if err != nil { // If redis returns null, the queue is empty. Just ignore the error. if err.Error() != "redigo: nil returned" { - Logger.Println("ERR: ", err) + Logger.Errorln("failed to fetch message", err) time.Sleep(1 * time.Second) } } else { @@ -100,7 +100,7 @@ func (f *fetch) sendMessage(message string) { msg, err := NewMsg(message) if err != nil { - Logger.Println("ERR: Couldn't create message from", message, ":", err) + Logger.Errorln("failed to create message from", message, ":", err) return } @@ -145,7 +145,7 @@ func (f *fetch) inprogressMessages() []string { messages, err := redis.Strings(conn.Do("lrange", f.inprogressQueue(), 0, -1)) if err != nil { - Logger.Println("ERR: ", err) + Logger.Errorln("failed to fetch messages in progress", err) } return messages diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..15c60c2 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module github.com/topfreegames/go-workers + +go 1.19 + +require ( + github.com/bitly/go-simplejson v0.5.1 + github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 + github.com/gomodule/redigo v1.9.2 + github.com/sirupsen/logrus v1.9.3 +) + +require ( + github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..15904ff --- /dev/null +++ b/go.sum @@ -0,0 +1,25 @@ +github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow= +github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q= +github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 h1:O0YTztXI3XeJXlFhSo4wNb0VBVqSgT+hi/CjNWKvMnY= +github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39/go.mod h1:OzYUFhPuL2JbjwFwrv6CZs23uBawekc6OZs+g19F0mY= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/garyburd/redigo v1.6.4 h1:LFu2R3+ZOPgSMWMOL+saa/zXRjw0ID2G8FepO53BGlg= +github.com/garyburd/redigo v1.6.4/go.mod h1:rTb6epsqigu3kYKBnaF028A7Tf/Aw5s0cqA47doKKqw= +github.com/gomodule/redigo v1.9.2 h1:HrutZBLhSIU8abiSfW8pj8mPhOyMYjZT/wcA4/L9L9s= +github.com/gomodule/redigo v1.9.2/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw= +github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7qkZ3nf2NPqy4BMzlCmnQzIEbI1vuqKb2FkQ= +github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/manager.go b/manager.go index 0e4ee6f..8533908 100644 --- a/manager.go +++ b/manager.go @@ -32,7 +32,7 @@ func (m *manager) prepare() { } func (m *manager) quit() { - Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).") + Logger.Infoln("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).") m.prepare() m.workersM.Lock() @@ -50,7 +50,7 @@ func (m *manager) quit() { } func (m *manager) manage() { - Logger.Println("processing queue", m.queueName(), "with", m.concurrency, "workers.") + Logger.Infoln("processing queue", m.queueName(), "with", m.concurrency, "workers.") go m.fetch.Fetch() @@ -77,7 +77,7 @@ func (m *manager) loadWorkers() { func (m *manager) processing() (count int) { m.workersM.Lock() for _, worker := range m.workers { - if worker.processing() { + if worker != nil && worker.processing() { count++ } } diff --git a/manager_test.go b/manager_test.go index c90550e..38f1863 100644 --- a/manager_test.go +++ b/manager_test.go @@ -7,7 +7,7 @@ import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) type customMid struct { @@ -37,6 +37,7 @@ func (m *customMid) Trace() []string { } func ManagerSpec(c gospec.Context) { + const queueName = "queue-manager" processed := make(chan *Args) testJob := (func(message *Msg) { @@ -48,28 +49,28 @@ func ManagerSpec(c gospec.Context) { c.Specify("newManager", func() { c.Specify("sets queue with namespace", func() { - manager := newManager("myqueue", testJob, 10) - c.Expect(manager.queue, Equals, "prod:queue:myqueue") + manager := newManager(queueName, testJob, 10) + c.Expect(manager.queue, Equals, fmt.Sprintf("prod:queue:%s", queueName)) }) c.Specify("sets job function", func() { - manager := newManager("myqueue", testJob, 10) - c.Expect(fmt.Sprint(manager.job), Equals, fmt.Sprint(testJob)) + manager := newManager(queueName, testJob, 10) + c.Expect(fmt.Sprintf("%p", manager.job), Equals, fmt.Sprintf("%p", testJob)) }) c.Specify("sets worker concurrency", func() { - manager := newManager("myqueue", testJob, 10) + manager := newManager(queueName, testJob, 10) c.Expect(manager.concurrency, Equals, 10) }) c.Specify("no per-manager middleware means 'use global Middleware object'", func() { - manager := newManager("myqueue", testJob, 10) + manager := newManager(queueName, testJob, 10) c.Expect(manager.mids, Equals, Middleware) }) c.Specify("per-manager middlewares create separate middleware chains", func() { mid1 := customMid{Base: "0"} - manager := newManager("myqueue", testJob, 10, &mid1) + manager := newManager(queueName, testJob, 10, &mid1) c.Expect(manager.mids, Not(Equals), Middleware) c.Expect(len(manager.mids.actions), Equals, len(Middleware.actions)+1) }) diff --git a/middleware_logging.go b/middleware_logging.go index 11eaa13..a4aa062 100644 --- a/middleware_logging.go +++ b/middleware_logging.go @@ -12,16 +12,16 @@ func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() bool) ( prefix := fmt.Sprint(queue, " JID-", message.Jid()) start := time.Now() - Logger.Println(prefix, "start") - Logger.Println(prefix, "args:", message.Args().ToJson()) + Logger.Debug(prefix, "start") + Logger.Debug(prefix, "args:", message.Args().ToJson()) defer func() { if e := recover(); e != nil { - Logger.Println(prefix, "fail:", time.Since(start)) + Logger.Debug(prefix, "fail:", time.Since(start)) buf := make([]byte, 4096) buf = buf[:runtime.Stack(buf, false)] - Logger.Printf("%s error: %v\n%s", prefix, e, buf) + Logger.Errorf("%s error: %v\n%s", prefix, e, buf) panic(e) } @@ -29,7 +29,7 @@ func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() bool) ( acknowledge = next() - Logger.Println(prefix, "done:", time.Since(start)) + Logger.Debug(prefix, "done:", time.Since(start)) return } diff --git a/middleware_retry.go b/middleware_retry.go index 1bc1ccf..b36d21c 100644 --- a/middleware_retry.go +++ b/middleware_retry.go @@ -1,6 +1,7 @@ package workers import ( + "encoding/json" "fmt" "math" "math/rand" @@ -19,15 +20,15 @@ func (r *MiddlewareRetry) Call(queue string, message *Msg, next func() bool) (ac if e := recover(); e != nil { conn := Config.Pool.Get() defer conn.Close() - if retry(message) { message.Set("queue", queue) message.Set("error_message", fmt.Sprintf("%v", e)) retryCount := incrementRetry(message) + retryOptions, _ := message.Get("retry_options").Map() waitDuration := durationToSecondsWithNanoPrecision( time.Duration( - secondsToDelay(retryCount), + secondsToDelay(retryCount, retryOptions), ) * time.Second, ) @@ -61,11 +62,15 @@ func retry(message *Msg) bool { if param, err := message.Get("retry").Bool(); err == nil { retry = param - } else if param, err := message.Get("retry").Int(); err == nil { + } else if param, err := message.Get("retry").Int(); err == nil { // compatible with sidekiq max = param retry = true } + if param, err := message.Get("retry_max").Int(); err == nil { + max = param + } + count, _ := message.Get("retry_count").Int() return retry && count < max @@ -86,7 +91,38 @@ func incrementRetry(message *Msg) (retryCount int) { return } -func secondsToDelay(count int) int { - power := math.Pow(float64(count), 4) - return int(power) + 15 + (rand.Intn(30) * (count + 1)) +func secondsToDelay(count int, retryOptions map[string]interface{}) int { + exp := float64(4) + minDelay := float64(15) + maxDelay := math.Inf(1) + maxRand := float64(30) + if retryOptions != nil { + if v, ok := retryOptions["exp"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + exp = v2 + } + } + if v, ok := retryOptions["min_delay"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + minDelay = v2 + } + } + if v, ok := retryOptions["max_delay"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + maxDelay = v2 + } + } + if v, ok := retryOptions["max_rand"].(json.Number); ok { + if v2, err := v.Float64(); err == nil { + maxRand = v2 + } + } + } + + power := math.Pow(float64(count), exp) + randN := 0 + if maxRand > 0 { + randN = rand.Intn(int(maxRand)) + } + return int(math.Min(power+minDelay+float64(randN*(count+1)), maxDelay)) } diff --git a/middleware_retry_test.go b/middleware_retry_test.go index a17cf7a..5970937 100644 --- a/middleware_retry_test.go +++ b/middleware_retry_test.go @@ -1,13 +1,16 @@ package workers import ( + "time" + "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" - "time" + "github.com/gomodule/redigo/redis" ) func MiddlewareRetrySpec(c gospec.Context) { + const queueName = "queue-middleware_retry" + var panicingJob = (func(message *Msg) { panic("AHHHH") }) @@ -17,7 +20,7 @@ func MiddlewareRetrySpec(c gospec.Context) { ) layout := "2006-01-02 15:04:05 MST" - manager := newManager("myqueue", panicingJob, 1) + manager := newManager(queueName, panicingJob, 1) worker := newWorker(manager) was := Config.Namespace @@ -26,7 +29,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("puts messages in retry queue when they fail", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -40,7 +43,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("allows disabling retries", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":false}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -54,7 +57,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("doesn't retry by default", func() { message, _ := NewMsg("{\"jid\":\"2\"}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -68,7 +71,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("allows numeric retries", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":5}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -82,7 +85,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("handles new failed message", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -99,7 +102,7 @@ func MiddlewareRetrySpec(c gospec.Context) { error_backtrace, _ := message.Get("error_backtrace").String() failed_at, _ := message.Get("failed_at").String() - c.Expect(queue, Equals, "myqueue") + c.Expect(queue, Equals, queueName) c.Expect(error_message, Equals, "AHHHH") c.Expect(error_class, Equals, "") c.Expect(retry_count, Equals, 0) @@ -110,7 +113,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("handles recurring failed message", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"queue\":\"default\",\"error_message\":\"bam\",\"failed_at\":\"2013-07-20 14:03:42 UTC\",\"retry_count\":10}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -126,7 +129,7 @@ func MiddlewareRetrySpec(c gospec.Context) { failed_at, _ := message.Get("failed_at").String() retried_at, _ := message.Get("retried_at").String() - c.Expect(queue, Equals, "myqueue") + c.Expect(queue, Equals, queueName) c.Expect(error_message, Equals, "AHHHH") c.Expect(retry_count, Equals, 11) c.Expect(failed_at, Equals, "2013-07-20 14:03:42 UTC") @@ -134,9 +137,9 @@ func MiddlewareRetrySpec(c gospec.Context) { }) c.Specify("handles recurring failed message with customized max", func() { - message, _ := NewMsg("{\"jid\":\"2\",\"retry\":10,\"queue\":\"default\",\"error_message\":\"bam\",\"failed_at\":\"2013-07-20 14:03:42 UTC\",\"retry_count\":8}") + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"queue\":\"default\",\"error_message\":\"bam\",\"failed_at\":\"2013-07-20 14:03:42 UTC\",\"retry_count\":8,\"retry_max\":10}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -152,7 +155,7 @@ func MiddlewareRetrySpec(c gospec.Context) { failed_at, _ := message.Get("failed_at").String() retried_at, _ := message.Get("retried_at").String() - c.Expect(queue, Equals, "myqueue") + c.Expect(queue, Equals, queueName) c.Expect(error_message, Equals, "AHHHH") c.Expect(retry_count, Equals, 9) c.Expect(failed_at, Equals, "2013-07-20 14:03:42 UTC") @@ -162,7 +165,7 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Specify("doesn't retry after default number of retries", func() { message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_count\":25}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -174,9 +177,9 @@ func MiddlewareRetrySpec(c gospec.Context) { }) c.Specify("doesn't retry after customized number of retries", func() { - message, _ := NewMsg("{\"jid\":\"2\",\"retry\":3,\"retry_count\":3}") + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_max\":3,\"retry_count\":3}") - wares.call("myqueue", message, func() { + wares.call(queueName, message, func() { worker.process(message) }) @@ -187,5 +190,90 @@ func MiddlewareRetrySpec(c gospec.Context) { c.Expect(count, Equals, 0) }) + c.Specify("use retry_options when provided - min_delay", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":2,\"min_delay\":1200,\"max_rand\":0}}") + var now int + wares.call(queueName, message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(int(nextAt), Equals, now+1200) + }) + + c.Specify("use retry_options when provided - exp", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_count\":2,\"retry_options\":{\"exp\":2,\"min_delay\":0,\"max_rand\":0}}") + var now int + wares.call(queueName, message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(int(nextAt), Equals, now+9) + }) + + c.Specify("use retry_options when provided - max_delay", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":20,\"min_delay\":600,\"max_delay\":10,\"max_rand\":10000}}") + var now int + wares.call(queueName, message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(int(nextAt), Equals, now+10) + }) + + c.Specify("use retry_options when provided - max_rand", func() { + message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true,\"retry_options\":{\"exp\":2,\"min_delay\":0,\"max_rand\":100}}") + var now int + wares.call(queueName, message, func() { + worker.process(message) + now = int(time.Now().Unix()) + }) + + conn := Config.Pool.Get() + defer conn.Close() + + count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + c.Expect(count, Equals, 1) + values, _ := redis.Values(conn.Do("ZRANGE", "prod:"+RETRY_KEY, 0, -1, "WITHSCORES")) + + msg := "" + nextAt := -1.0 + redis.Scan(values, &msg, &nextAt) + c.Expect(nextAt, Satisfies, nextAt <= float64(now+100)) + c.Expect(nextAt, Satisfies, nextAt >= float64(now+0)) + }) + Config.Namespace = was } diff --git a/middleware_stats.go b/middleware_stats.go index 62a7663..c3bcc63 100644 --- a/middleware_stats.go +++ b/middleware_stats.go @@ -32,6 +32,6 @@ func incrementStats(metric string) { conn.Send("incr", Config.Namespace+"stat:"+metric+":"+today) if _, err := conn.Do("exec"); err != nil { - Logger.Println("couldn't save stats:", err) + Logger.Errorln("failed to save stats:", err) } } diff --git a/middleware_stats_test.go b/middleware_stats_test.go index 46b53a8..2de2962 100644 --- a/middleware_stats_test.go +++ b/middleware_stats_test.go @@ -3,17 +3,19 @@ package workers import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "time" ) func MiddlewareStatsSpec(c gospec.Context) { + const queueName = "queue-middleware_stats" + var job = (func(message *Msg) { // noop }) layout := "2006-01-02" - manager := newManager("myqueue", job, 1) + manager := newManager(queueName, job, 1) worker := newWorker(manager) message, _ := NewMsg("{\"jid\":\"2\",\"retry\":true}") @@ -44,7 +46,7 @@ func MiddlewareStatsSpec(c gospec.Context) { panic("AHHHH") }) - manager := newManager("myqueue", job, 1) + manager := newManager(queueName, job, 1) worker := newWorker(manager) c.Specify("increments failed stats", func() { diff --git a/msg.go b/msg.go index f922fa0..2a47c36 100644 --- a/msg.go +++ b/msg.go @@ -1,8 +1,9 @@ package workers import ( - "github.com/bitly/go-simplejson" "reflect" + + "github.com/bitly/go-simplejson" ) type data struct { @@ -39,7 +40,7 @@ func (d *data) ToJson() string { json, err := d.Encode() if err != nil { - Logger.Println("ERR: Couldn't generate json from", d, ":", err) + Logger.Errorln("failed to generate json from", d, ":", err) } return string(json) diff --git a/scheduled.go b/scheduled.go index a7b08fa..91bda94 100644 --- a/scheduled.go +++ b/scheduled.go @@ -4,7 +4,7 @@ import ( "strings" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) type scheduled struct { @@ -24,7 +24,7 @@ func (s *scheduled) start() { s.poll() - time.Sleep(time.Duration(Config.PollInterval) * time.Second) + time.Sleep(time.Duration(Config.PoolInterval) * time.Second) } })() } diff --git a/scheduled_test.go b/scheduled_test.go index d292a77..af591f4 100644 --- a/scheduled_test.go +++ b/scheduled_test.go @@ -3,7 +3,7 @@ package workers import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" ) func ScheduledSpec(c gospec.Context) { diff --git a/stats.go b/stats.go index d62864e..e9248a7 100644 --- a/stats.go +++ b/stats.go @@ -15,10 +15,42 @@ type stats struct { Retries int64 `json:"retries"` } +// Stats writes stats on response writer func Stats(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Access-Control-Allow-Origin", "*") + stats := getStats() + + body, _ := json.MarshalIndent(stats, "", " ") + fmt.Fprintln(w, string(body)) +} + +// WorkerStats holds workers stats +type WorkerStats struct { + Processed int `json:"processed"` + Failed int `json:"failed"` + Enqueued map[string]string `json:"enqueued"` + Retries int64 `json:"retries"` +} + +// GetStats returns workers stats +func GetStats() *WorkerStats { + stats := getStats() + enqueued := map[string]string{} + if statsEnqueued, ok := stats.Enqueued.(map[string]string); ok { + enqueued = statsEnqueued + } + + return &WorkerStats{ + Processed: stats.Processed, + Failed: stats.Failed, + Retries: stats.Retries, + Enqueued: enqueued, + } +} + +func getStats() stats { jobs := make(map[string][]*map[string]interface{}) enqueued := make(map[string]string) @@ -55,14 +87,14 @@ func Stats(w http.ResponseWriter, req *http.Request) { conn.Send("get", Config.Namespace+"stat:failed") conn.Send("zcard", Config.Namespace+RETRY_KEY) - for key, _ := range enqueued { + for key := range enqueued { conn.Send("llen", fmt.Sprintf("%squeue:%s", Config.Namespace, key)) } r, err := conn.Do("exec") if err != nil { - Logger.Println("couldn't retrieve stats:", err) + Logger.Errorln("failed to retrieve stats:", err) } results := r.([]interface{}) @@ -83,7 +115,7 @@ func Stats(w http.ResponseWriter, req *http.Request) { } queueIndex := 0 - for key, _ := range enqueued { + for key := range enqueued { if queueIndex == (index - 3) { enqueued[key] = fmt.Sprintf("%d", result.(int64)) } @@ -92,6 +124,5 @@ func Stats(w http.ResponseWriter, req *http.Request) { } } - body, _ := json.MarshalIndent(stats, "", " ") - fmt.Fprintln(w, string(body)) + return stats } diff --git a/worker_test.go b/worker_test.go index 1097070..d9e7b41 100644 --- a/worker_test.go +++ b/worker_test.go @@ -37,13 +37,15 @@ func confirm(manager *manager) (msg *Msg) { } func WorkerSpec(c gospec.Context) { + const queueName = "queue-worker_test" + var processed = make(chan *Args) var testJob = (func(message *Msg) { processed <- message.Args() }) - manager := newManager("myqueue", testJob, 1) + manager := newManager(queueName, testJob, 1) c.Specify("newWorker", func() { c.Specify("it returns an instance of worker with connection to manager", func() { @@ -125,7 +127,7 @@ func WorkerSpec(c gospec.Context) { panic("AHHHHHHHHH") }) - manager := newManager("myqueue", panicJob, 1) + manager := newManager(queueName, panicJob, 1) worker := newWorker(manager) go worker.work(messages) diff --git a/workers.go b/workers.go index fc39be6..1ba2658 100644 --- a/workers.go +++ b/workers.go @@ -3,9 +3,7 @@ package workers import ( "errors" "fmt" - "log" "net/http" - "os" "sync" ) @@ -14,8 +12,6 @@ const ( SCHEDULED_JOBS_KEY = "schedule" ) -var Logger WorkersLogger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds) - var managers = make(map[string]*manager) var schedule *scheduled var control = make(map[string]chan string) @@ -88,10 +84,10 @@ func Quit() { func StatsServer(port int) { http.HandleFunc("/stats", Stats) - Logger.Println("Stats are available at", fmt.Sprint("http://localhost:", port, "/stats")) + Logger.Infoln("Stats are available at", fmt.Sprint("http://localhost:", port, "/stats")) if err := http.ListenAndServe(fmt.Sprint(":", port), nil); err != nil { - Logger.Println(err) + Logger.Error("failed to start stats server", err) } } diff --git a/workers_logger.go b/workers_logger.go index cab91b1..4ac7005 100644 --- a/workers_logger.go +++ b/workers_logger.go @@ -1,6 +1,43 @@ package workers +import "github.com/sirupsen/logrus" + +// WorkersLogger represents the log interface type WorkersLogger interface { - Println(...interface{}) - Printf(string, ...interface{}) + Fatal(format ...interface{}) + Fatalf(format string, args ...interface{}) + Fatalln(args ...interface{}) + + Debug(args ...interface{}) + Debugf(format string, args ...interface{}) + Debugln(args ...interface{}) + + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Errorln(args ...interface{}) + + Info(args ...interface{}) + Infof(format string, args ...interface{}) + Infoln(args ...interface{}) + + Warn(args ...interface{}) + Warnf(format string, args ...interface{}) + Warnln(args ...interface{}) +} + +// Logger is the default logger +var Logger = initLogger() + +func initLogger() WorkersLogger { + plog := logrus.New() + plog.Formatter = new(logrus.TextFormatter) + plog.Level = logrus.InfoLevel + return plog.WithFields(logrus.Fields{"source": "go-workers"}) +} + +// SetLogger rewrites the default logger +func SetLogger(l WorkersLogger) { + if l != nil { + Logger = l + } } diff --git a/workers_test.go b/workers_test.go index 4bcd8b6..a8d74f2 100644 --- a/workers_test.go +++ b/workers_test.go @@ -9,20 +9,25 @@ import ( var called chan bool -func myJob(message *Msg) { +func myJob(_ *Msg) { called <- true } func WorkersSpec(c gospec.Context) { + const queueName = "queue-workers" + c.Specify("Workers", func() { c.Specify("allows running in tests", func() { called = make(chan bool) - Process("myqueue", myJob, 10) + Process(queueName, myJob, 10) Start() - Enqueue("myqueue", "Add", []int{1, 2}) + _, err := Enqueue(queueName, "Add", []int{1, 2}) + if err != nil { + panic(err) + } <-called Quit() @@ -32,14 +37,14 @@ func WorkersSpec(c gospec.Context) { //c.Specify("allows starting and stopping multiple times", func() { // called = make(chan bool) - // Process("myqueue", myJob, 10) + // Process(queueName, myJob, 10) // Start() // Quit() // Start() - // Enqueue("myqueue", "Add", []int{1, 2}) + // Enqueue(queueName, "Add", []int{1, 2}) // <-called // Quit()