Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ func main() {
// Add a job to a queue with retry
workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

// Add a job to a queue in a different redis instance
workers.EnqueueWithOptions("myqueue4", "Add", []int{1, 2},
workers.EnqueueOptions{
Retry: true,
ConnectionOptions: map[string]string{
"server": "localhost:6378",
"database": "my-database",
"pool": "10",
"password": "pass",
}},
)

// stats will be available at http://localhost:8080/stats
go workers.StatsServer(8080)

Expand Down
56 changes: 30 additions & 26 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type config struct {
var Config *config

func Configure(options map[string]string) {
var poolSize int
var namespace string
var pollInterval int
var retryKey string
Expand Down Expand Up @@ -55,42 +54,47 @@ func Configure(options map[string]string) {
}

scheduleKey = defaultScheduleJobsKey
poolSize, _ = strconv.Atoi(options["pool"])

Config = &config{
options["process"],
namespace,
pollInterval,
retryKey,
scheduleKey,
&redis.Pool{
MaxIdle: poolSize,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", options["server"])
if err != nil {
GetConnectionPool(options),
DefaultFetch,
}
}

func GetConnectionPool(options map[string]string) *redis.Pool {
poolSize, _ := strconv.Atoi(options["pool"])

return &redis.Pool{
MaxIdle: poolSize,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", options["server"])
if err != nil {
return nil, err
}
if options["password"] != "" {
if _, err := c.Do("AUTH", options["password"]); err != nil {
c.Close()
return nil, err
}
if options["password"] != "" {
if _, err := c.Do("AUTH", options["password"]); err != nil {
c.Close()
return nil, err
}
}
if options["database"] != "" {
if _, err := c.Do("SELECT", options["database"]); err != nil {
c.Close()
return nil, err
}
}
if options["database"] != "" {
if _, err := c.Do("SELECT", options["database"]); err != nil {
c.Close()
return nil, err
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
DefaultFetch,
}
}

Expand Down
27 changes: 22 additions & 5 deletions enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"io"
"time"

"github.com/gomodule/redigo/redis"
)

const (
Expand Down Expand Up @@ -44,9 +46,19 @@ func (e EnqueueData) MarshalJSON() ([]byte, error) {
}

type EnqueueOptions struct {
RetryCount int `json:"retry_count,omitempty"`
Retry bool `json:"retry,omitempty"`
At float64 `json:"at,omitempty"`
RetryCount int `json:"retry_count,omitempty"`
Retry bool `json:"retry,omitempty"`
RetryMax int `json:"retry_max,omitempty"`
At float64 `json:"at,omitempty"`
RetryOptions RetryOptions `json:"retry_options,omitempty"`
ConnectionOptions map[string]string `json:"connection_options,omitempty"`
}

type RetryOptions struct {
Exp int `json:"exp"`
MinDelay int `json:"min_delay"`
MaxDelay int `json:"max_delay"`
MaxRand int `json:"max_rand"`
}

func generateJid() string {
Expand Down Expand Up @@ -92,15 +104,20 @@ func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptio
return data.Jid, err
}

conn := Config.Pool.Get()
var conn redis.Conn
if len(opts.ConnectionOptions) == 0 {
conn = Config.Pool.Get()
} else {
conn = GetConnectionPool(opts.ConnectionOptions).Get()
}
defer conn.Close()

_, err = conn.Do("sadd", Config.Namespace+"queues", queue)
if err != nil {
return "", err
}
queue = Config.Namespace + "queue:" + queue
_, err = conn.Do("rpush", queue, bytes)
_, err = conn.Do("lpush", queue, bytes)
if err != nil {
return "", err
}
Expand Down
39 changes: 33 additions & 6 deletions enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,19 @@ func EnqueueSpec(c gospec.Context) {
c.Expect(ea, IsWithin(0.1), nowToSecondsWithNanoPrecision())
})

c.Specify("sets retry count to `retry`", func() {
EnqueueWithOptions("enqueue6", "Compare", []string{"foo", "bar"}, EnqueueOptions{RetryCount: 13})
c.Specify("has retry and retry_max when set", func() {
EnqueueWithOptions("enqueue6", "Compare", []string{"foo", "bar"}, EnqueueOptions{RetryMax: 13, Retry: true})

bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue6"))
var result map[string]interface{}
json.Unmarshal(bytes, &result)
c.Expect(result["class"], Equals, "Compare")

retry := result["retry"].(float64)
c.Expect(retry, Equals, float64(13))
retry := result["retry"].(bool)
c.Expect(retry, Equals, true)

retryCount := result["retry_count"].(float64)
c.Expect(retryCount, Equals, float64(0))
retryMax := int(result["retry_max"].(float64))
c.Expect(retryMax, Equals, 13)
})

c.Specify("sets Retry correctly when no count given", func() {
Expand All @@ -98,6 +98,33 @@ func EnqueueSpec(c gospec.Context) {
retry := result["retry"].(bool)
c.Expect(retry, Equals, true)
})

c.Specify("has retry_options when set", func() {
EnqueueWithOptions(
"enqueue7", "Compare", []string{"foo", "bar"},
EnqueueOptions{
RetryMax: 13,
Retry: true,
RetryOptions: RetryOptions{
Exp: 2,
MinDelay: 0,
MaxDelay: 60,
MaxRand: 30,
},
})

bytes, _ := redis.Bytes(conn.Do("lpop", "prod:queue:enqueue7"))
var result map[string]interface{}
json.Unmarshal(bytes, &result)
c.Expect(result["class"], Equals, "Compare")

retryOptions := result["retry_options"].(map[string]interface{})
c.Expect(len(retryOptions), Equals, 4)
c.Expect(retryOptions["exp"].(float64), Equals, float64(2))
c.Expect(retryOptions["min_delay"].(float64), Equals, float64(0))
c.Expect(retryOptions["max_delay"].(float64), Equals, float64(60))
c.Expect(retryOptions["max_rand"].(float64), Equals, float64(30))
})
})

c.Specify("EnqueueIn", func() {
Expand Down
6 changes: 3 additions & 3 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (f *fetch) tryFetchMessage() {
if err != nil {
// If redis returns null, the queue is empty. Just ignore the error.
if err.Error() != "redigo: nil returned" {
Logger.Println("ERR: ", err)
Logger.Errorln("failed to fetch message", err)
time.Sleep(1 * time.Second)
}
} else {
Expand All @@ -100,7 +100,7 @@ func (f *fetch) sendMessage(message string) {
msg, err := NewMsg(message)

if err != nil {
Logger.Println("ERR: Couldn't create message from", message, ":", err)
Logger.Errorln("failed to create message from", message, ":", err)
return
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func (f *fetch) inprogressMessages() []string {

messages, err := redis.Strings(conn.Do("lrange", f.inprogressQueue(), 0, -1))
if err != nil {
Logger.Println("ERR: ", err)
Logger.Errorln("failed to fetch messages in progress", err)
}

return messages
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ require (
github.com/gomodule/redigo v1.8.2
github.com/kr/pretty v0.2.1 // indirect
github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect
github.com/sirupsen/logrus v1.6.0
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 h1:O0YTztXI3XeJX
github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39/go.mod h1:OzYUFhPuL2JbjwFwrv6CZs23uBawekc6OZs+g19F0mY=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k=
github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
Expand All @@ -17,9 +21,14 @@ github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7q
github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
Expand Down
6 changes: 3 additions & 3 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (m *manager) prepare() {
}

func (m *manager) quit() {
Logger.Println("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).")
Logger.Infoln("quitting queue", m.queueName(), "(waiting for", m.processing(), "/", len(m.workers), "workers).")
m.prepare()

m.workersM.Lock()
Expand All @@ -50,7 +50,7 @@ func (m *manager) quit() {
}

func (m *manager) manage() {
Logger.Println("processing queue", m.queueName(), "with", m.concurrency, "workers.")
Logger.Infoln("processing queue", m.queueName(), "with", m.concurrency, "workers.")

go m.fetch.Fetch()

Expand All @@ -77,7 +77,7 @@ func (m *manager) loadWorkers() {
func (m *manager) processing() (count int) {
m.workersM.Lock()
for _, worker := range m.workers {
if worker.processing() {
if worker != nil && worker.processing() {
count++
}
}
Expand Down
10 changes: 5 additions & 5 deletions middleware_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() bool) (
prefix := fmt.Sprint(queue, " JID-", message.Jid())

start := time.Now()
Logger.Println(prefix, "start")
Logger.Println(prefix, "args:", message.Args().ToJson())
Logger.Debug(prefix, "start")
Logger.Debug(prefix, "args:", message.Args().ToJson())

defer func() {
if e := recover(); e != nil {
Logger.Println(prefix, "fail:", time.Since(start))
Logger.Debug(prefix, "fail:", time.Since(start))

buf := make([]byte, 4096)
buf = buf[:runtime.Stack(buf, false)]
Logger.Printf("%s error: %v\n%s", prefix, e, buf)
Logger.Errorf("%s error: %v\n%s", prefix, e, buf)

panic(e)
}
}()

acknowledge = next()

Logger.Println(prefix, "done:", time.Since(start))
Logger.Debug(prefix, "done:", time.Since(start))

return
}
Loading