Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@ import (
"github.com/gomodule/redigo/redis"
)

const (
defaultRetryKey = "goretry"
defaultScheduleJobsKey = "schedule"
)

type config struct {
ProcessId string
Namespace string
PollInterval int
RetryKey string
ScheduleKey string
Pool *redis.Pool
Fetch func(queue string) Fetcher
}
Expand All @@ -21,6 +28,8 @@ func Configure(options map[string]string) {
var poolSize int
var namespace string
var pollInterval int
var retryKey string
var scheduleKey string

if options["server"] == "" {
panic("Configure requires a 'server' option, which identifies a Redis instance")
Expand All @@ -39,13 +48,21 @@ func Configure(options map[string]string) {
} else {
pollInterval = 15
}
if options["retry_key"] == "" {
retryKey = defaultRetryKey
} else {
retryKey = options["retry_key"]
}

scheduleKey = defaultScheduleJobsKey
poolSize, _ = strconv.Atoi(options["pool"])

Config = &config{
options["process"],
namespace,
pollInterval,
retryKey,
scheduleKey,
&redis.Pool{
MaxIdle: poolSize,
IdleTimeout: 240 * time.Second,
Expand Down
19 changes: 19 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,23 @@ func ConfigSpec(c gospec.Context) {

c.Expect(Config.PollInterval, Equals, 1)
})

c.Specify("defaults retry key to goretry", func() {
Configure(map[string]string{
"server": "localhost:6379",
"process": "1",
})

c.Expect(Config.RetryKey, Equals, "goretry")
})

c.Specify("add 'retry' to the retry key", func() {
Configure(map[string]string{
"server": "localhost:6379",
"process": "1",
"retry_key": "retry",
})

c.Expect(Config.RetryKey, Equals, "retry")
})
}
2 changes: 1 addition & 1 deletion enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func enqueueAt(at float64, bytes []byte) error {

_, err := conn.Do(
"zadd",
Config.Namespace+SCHEDULED_JOBS_KEY, at, bytes,
Config.Namespace+Config.ScheduleKey, at, bytes,
)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func EnqueueSpec(c gospec.Context) {
})

c.Specify("EnqueueIn", func() {
scheduleQueue := "prod:" + SCHEDULED_JOBS_KEY
scheduleQueue := "prod:" + Config.ScheduleKey
conn := Config.Pool.Get()
defer conn.Close()

Expand Down
2 changes: 1 addition & 1 deletion middleware_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (r *MiddlewareRetry) Call(queue string, message *Msg, next func() bool) (ac

_, err := conn.Do(
"zadd",
Config.Namespace+RETRY_KEY,
Config.Namespace+Config.RetryKey,
nowToSecondsWithNanoPrecision()+waitDuration,
message.ToJson(),
)
Expand Down
18 changes: 9 additions & 9 deletions middleware_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
c.Expect(retries[0], Equals, message.ToJson())
})

Expand All @@ -48,7 +48,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))
c.Expect(count, Equals, 0)
})

Expand All @@ -62,7 +62,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))
c.Expect(count, Equals, 0)
})

Expand All @@ -76,7 +76,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
c.Expect(retries[0], Equals, message.ToJson())
})

Expand All @@ -90,7 +90,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
message, _ = NewMsg(retries[0])

queue, _ := message.Get("queue").String()
Expand Down Expand Up @@ -118,7 +118,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
message, _ = NewMsg(retries[0])

queue, _ := message.Get("queue").String()
Expand All @@ -144,7 +144,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
message, _ = NewMsg(retries[0])

queue, _ := message.Get("queue").String()
Expand All @@ -170,7 +170,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))
c.Expect(count, Equals, 0)
})

Expand All @@ -184,7 +184,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))
c.Expect(count, Equals, 0)
})

Expand Down
10 changes: 5 additions & 5 deletions scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func ScheduledSpec(c gospec.Context) {
scheduled := newScheduled(RETRY_KEY)
scheduled := newScheduled(Config.RetryKey)

was := Config.Namespace
Config.Namespace = "prod:"
Expand All @@ -22,15 +22,15 @@ func ScheduledSpec(c gospec.Context) {
message2, _ := NewMsg("{\"queue\":\"myqueue\",\"foo\":\"bar2\"}")
message3, _ := NewMsg("{\"queue\":\"default\",\"foo\":\"bar3\"}")

conn.Do("zadd", "prod:"+RETRY_KEY, now-60.0, message1.ToJson())
conn.Do("zadd", "prod:"+RETRY_KEY, now-10.0, message2.ToJson())
conn.Do("zadd", "prod:"+RETRY_KEY, now+60.0, message3.ToJson())
conn.Do("zadd", "prod:"+Config.RetryKey, now-60.0, message1.ToJson())
conn.Do("zadd", "prod:"+Config.RetryKey, now-10.0, message2.ToJson())
conn.Do("zadd", "prod:"+Config.RetryKey, now+60.0, message3.ToJson())

scheduled.poll()

defaultCount, _ := redis.Int(conn.Do("llen", "prod:queue:default"))
myqueueCount, _ := redis.Int(conn.Do("llen", "prod:queue:myqueue"))
pending, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
pending, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))

c.Expect(defaultCount, Equals, 1)
c.Expect(myqueueCount, Equals, 1)
Expand Down
2 changes: 1 addition & 1 deletion stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Stats(w http.ResponseWriter, req *http.Request) {
conn.Send("multi")
conn.Send("get", Config.Namespace+"stat:processed")
conn.Send("get", Config.Namespace+"stat:failed")
conn.Send("zcard", Config.Namespace+RETRY_KEY)
conn.Send("zcard", Config.Namespace+Config.RetryKey)

for key, _ := range enqueued {
conn.Send("llen", fmt.Sprintf("%squeue:%s", Config.Namespace, key))
Expand Down
2 changes: 1 addition & 1 deletion workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func StatsServer(port int) {

func startSchedule() {
if schedule == nil {
schedule = newScheduled(RETRY_KEY, SCHEDULED_JOBS_KEY)
schedule = newScheduled(Config.RetryKey, Config.ScheduleKey)
}

schedule.start()
Expand Down