diff --git a/config.go b/config.go index f93ac94..4c8572e 100644 --- a/config.go +++ b/config.go @@ -7,10 +7,17 @@ import ( "github.com/garyburd/redigo/redis" ) +const ( + defaultRetryKey = "goretry" + defaultScheduleJobsKey = "schedule" +) + type config struct { processId string Namespace string PollInterval int + RetryKey string + ScheduleKey string Pool *redis.Pool Fetch func(queue string) Fetcher } @@ -21,6 +28,8 @@ func Configure(options map[string]string) { var poolSize int var namespace string var pollInterval int + var retryKey string + var scheduleKey string if options["server"] == "" { panic("Configure requires a 'server' option, which identifies a Redis instance") @@ -39,13 +48,21 @@ func Configure(options map[string]string) { } else { pollInterval = 15 } + if options["retry_key"] == "" { + retryKey = defaultRetryKey + } else { + retryKey = options["retry_key"] + } + scheduleKey = defaultScheduleJobsKey poolSize, _ = strconv.Atoi(options["pool"]) Config = &config{ options["process"], namespace, pollInterval, + retryKey, + scheduleKey, &redis.Pool{ MaxIdle: poolSize, IdleTimeout: 240 * time.Second, diff --git a/config_test.go b/config_test.go index afde76d..5bf8dea 100644 --- a/config_test.go +++ b/config_test.go @@ -87,4 +87,24 @@ func ConfigSpec(c gospec.Context) { c.Expect(Config.PollInterval, Equals, 1) }) + + c.Specify("defaults retry key to goretry", func() { + Configure(map[string]string{ + "server": "localhost:6379", + "process": "1", + }) + + c.Expect(Config.RetryKey, Equals, "goretry") + }) + + c.Specify("add 'retry' to the retry key", func() { + Configure(map[string]string{ + "server": "localhost:6379", + "process": "1", + "retry_key": "retry", + }) + + c.Expect(Config.RetryKey, Equals, "retry") + }) + } diff --git a/enqueue.go b/enqueue.go index f9af7a8..66c7d34 100644 --- a/enqueue.go +++ b/enqueue.go @@ -92,7 +92,7 @@ func enqueueAt(at float64, bytes []byte) error { _, err := conn.Do( "zadd", - Config.Namespace+SCHEDULED_JOBS_KEY, at, bytes, + Config.Namespace+Config.ScheduleKey, at, bytes, ) if err != nil { return err diff --git a/enqueue_test.go b/enqueue_test.go index e3a2707..d0fa2eb 100644 --- a/enqueue_test.go +++ b/enqueue_test.go @@ -89,7 +89,7 @@ func EnqueueSpec(c gospec.Context) { }) c.Specify("EnqueueIn", func() { - scheduleQueue := "prod:" + SCHEDULED_JOBS_KEY + scheduleQueue := "prod:" + Config.ScheduleKey conn := Config.Pool.Get() defer conn.Close() diff --git a/middleware_retry.go b/middleware_retry.go index 1bc1ccf..4de2cb0 100644 --- a/middleware_retry.go +++ b/middleware_retry.go @@ -33,7 +33,7 @@ func (r *MiddlewareRetry) Call(queue string, message *Msg, next func() bool) (ac _, err := conn.Do( "zadd", - Config.Namespace+RETRY_KEY, + Config.Namespace+Config.RetryKey, nowToSecondsWithNanoPrecision()+waitDuration, message.ToJson(), ) diff --git a/middleware_retry_test.go b/middleware_retry_test.go index a17cf7a..aaa88c1 100644 --- a/middleware_retry_test.go +++ b/middleware_retry_test.go @@ -1,10 +1,11 @@ package workers import ( + "time" + "github.com/customerio/gospec" . "github.com/customerio/gospec" "github.com/garyburd/redigo/redis" - "time" ) func MiddlewareRetrySpec(c gospec.Context) { @@ -33,7 +34,7 @@ func MiddlewareRetrySpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1)) + retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1)) c.Expect(retries[0], Equals, message.ToJson()) }) @@ -47,7 +48,7 @@ func MiddlewareRetrySpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey)) c.Expect(count, Equals, 0) }) @@ -61,7 +62,7 @@ func MiddlewareRetrySpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey)) c.Expect(count, Equals, 0) }) @@ -75,7 +76,7 @@ func MiddlewareRetrySpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1)) + retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1)) c.Expect(retries[0], Equals, message.ToJson()) }) @@ -89,7 +90,7 @@ func MiddlewareRetrySpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1)) + retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1)) message, _ = NewMsg(retries[0]) queue, _ := message.Get("queue").String() @@ -117,7 +118,7 @@ func MiddlewareRetrySpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1)) + retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1)) message, _ = NewMsg(retries[0]) queue, _ := message.Get("queue").String() @@ -143,7 +144,7 @@ func MiddlewareRetrySpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1)) + retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1)) message, _ = NewMsg(retries[0]) queue, _ := message.Get("queue").String() @@ -169,7 +170,7 @@ func MiddlewareRetrySpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey)) c.Expect(count, Equals, 0) }) @@ -183,7 +184,7 @@ func MiddlewareRetrySpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey)) c.Expect(count, Equals, 0) }) diff --git a/scheduled_test.go b/scheduled_test.go index d292a77..37dbbac 100644 --- a/scheduled_test.go +++ b/scheduled_test.go @@ -7,7 +7,7 @@ import ( ) func ScheduledSpec(c gospec.Context) { - scheduled := newScheduled(RETRY_KEY) + scheduled := newScheduled(Config.RetryKey) was := Config.Namespace Config.Namespace = "prod:" @@ -22,15 +22,15 @@ func ScheduledSpec(c gospec.Context) { message2, _ := NewMsg("{\"queue\":\"myqueue\",\"foo\":\"bar2\"}") message3, _ := NewMsg("{\"queue\":\"default\",\"foo\":\"bar3\"}") - conn.Do("zadd", "prod:"+RETRY_KEY, now-60.0, message1.ToJson()) - conn.Do("zadd", "prod:"+RETRY_KEY, now-10.0, message2.ToJson()) - conn.Do("zadd", "prod:"+RETRY_KEY, now+60.0, message3.ToJson()) + conn.Do("zadd", "prod:"+Config.RetryKey, now-60.0, message1.ToJson()) + conn.Do("zadd", "prod:"+Config.RetryKey, now-10.0, message2.ToJson()) + conn.Do("zadd", "prod:"+Config.RetryKey, now+60.0, message3.ToJson()) scheduled.poll() defaultCount, _ := redis.Int(conn.Do("llen", "prod:queue:default")) myqueueCount, _ := redis.Int(conn.Do("llen", "prod:queue:myqueue")) - pending, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY)) + pending, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey)) c.Expect(defaultCount, Equals, 1) c.Expect(myqueueCount, Equals, 1) diff --git a/stats.go b/stats.go index d62864e..be542ce 100644 --- a/stats.go +++ b/stats.go @@ -53,7 +53,7 @@ func Stats(w http.ResponseWriter, req *http.Request) { conn.Send("multi") conn.Send("get", Config.Namespace+"stat:processed") conn.Send("get", Config.Namespace+"stat:failed") - conn.Send("zcard", Config.Namespace+RETRY_KEY) + conn.Send("zcard", Config.Namespace+Config.RetryKey) for key, _ := range enqueued { conn.Send("llen", fmt.Sprintf("%squeue:%s", Config.Namespace, key)) diff --git a/workers.go b/workers.go index fc39be6..2359801 100644 --- a/workers.go +++ b/workers.go @@ -9,11 +9,6 @@ import ( "sync" ) -const ( - RETRY_KEY = "goretry" - SCHEDULED_JOBS_KEY = "schedule" -) - var Logger WorkersLogger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds) var managers = make(map[string]*manager) @@ -97,7 +92,7 @@ func StatsServer(port int) { func startSchedule() { if schedule == nil { - schedule = newScheduled(RETRY_KEY, SCHEDULED_JOBS_KEY) + schedule = newScheduled(Config.RetryKey, Config.ScheduleKey) } schedule.start()