diff --git a/fetch_test.go b/fetch_test.go index 2bfaee2..82bf247 100644 --- a/fetch_test.go +++ b/fetch_test.go @@ -4,6 +4,8 @@ import ( "github.com/customerio/gospec" . "github.com/customerio/gospec" "github.com/garyburd/redigo/redis" + + simplejson "github.com/bitly/go-simplejson" ) func buildFetch(queue string) Fetcher { @@ -13,6 +15,29 @@ func buildFetch(queue string) Fetcher { return fetch } +func buildVirginMessages(args ...string) Msgs { + var msgs []*Msg + for _, a := range args { + b := []byte(a) + j, _ := simplejson.NewJson(b) + msgs = append(msgs, NewMsg("", j, b)) + } + return msgs +} + +func buildMessages(args ...string) Msgs { + var s []string + for _, a := range args { + b := []byte(a) + j, _ := simplejson.NewJson(b) + m := NewMsg("", j, b) + s = append(s, m.ToJson()) + } + + messages, _ := NewMsgs(s) + return messages +} + func FetchSpec(c gospec.Context) { c.Specify("Config.Fetch", func() { c.Specify("it returns an instance of fetch with queue", func() { @@ -23,7 +48,7 @@ func FetchSpec(c gospec.Context) { }) c.Specify("Fetch", func() { - messages, _ := NewMsgs([]string{"{\"foo\":\"bar\"}"}) + messages := buildMessages("{\"foo\":\"bar\"}") c.Specify("it puts messages from the queues on the messages channel", func() { fetch := buildFetch("fetchQueue2") @@ -116,7 +141,7 @@ func FetchSpec(c gospec.Context) { }) c.Specify("refires any messages left in progress from prior instance", func() { - msgs, _ := NewMsgs([]string{"{\"foo\":\"bar2\"}", "{\"foo\":\"bar3\"}"}) + msgs := buildMessages("{\"foo\":\"bar2\"}", "{\"foo\":\"bar3\"}") conn := Config.Pool.Get() defer conn.Close() diff --git a/fetcher.go b/fetcher.go index ccf6602..5cd956f 100644 --- a/fetcher.go +++ b/fetcher.go @@ -120,7 +120,7 @@ func (f *fetch) Acknowledge(messages Msgs) { // TODO optimize with a redis lua script for _, m := range messages { - conn.Do("lrem", f.inprogressQueue(), -1, m.OriginalJson()) + conn.Do("lrem", f.inprogressQueue(), -1, string(m.OriginalJson())) } } diff --git a/manager.go b/manager.go index 0c35910..6b21713 100644 --- a/manager.go +++ b/manager.go @@ -54,13 +54,14 @@ func (m *manager) manage() { go m.fetch.Fetch() +loop: for { select { case message := <-m.confirm: m.fetch.Acknowledge(message) case <-m.stop: m.exit <- true - break + break loop } } } diff --git a/manager_test.go b/manager_test.go index e11b48c..97cdb35 100644 --- a/manager_test.go +++ b/manager_test.go @@ -81,8 +81,8 @@ func ManagerSpec(c gospec.Context) { conn := Config.Pool.Get() defer conn.Close() - message, _ := NewMsg("{\"foo\":\"bar\",\"args\":[\"foo\",\"bar\"]}") - message2, _ := NewMsg("{\"foo\":\"bar2\",\"args\":[\"foo\",\"bar2\"]}") + msgs := buildVirginMessages("[\"foo\",\"bar\"]", "[\"foo\",\"bar2\"]") + message, message2 := msgs[0], msgs[1] c.Specify("coordinates processing of queue messages", func() { manager := newManager("manager1", testJob, 10) diff --git a/middleware_retry.go b/middleware_retry.go index 8785403..e500268 100644 --- a/middleware_retry.go +++ b/middleware_retry.go @@ -22,8 +22,8 @@ func (r *MiddlewareRetry) Call(queue string, messages Msgs, next func() bool) (a for _, m := range messages { if retry(m) { - m.Set("queue", queue) - m.Set("error_message", fmt.Sprintf("%v", e)) + m.queue = queue + m.error = fmt.Sprintf("%v", e) retryCount := incrementRetry(m) waitDuration := durationToSecondsWithNanoPrecision( @@ -58,34 +58,26 @@ func (r *MiddlewareRetry) Call(queue string, messages Msgs, next func() bool) (a } func retry(message *Msg) bool { - retry := false max := DEFAULT_MAX_RETRY + retry := message.Retry + count := message.retryCount + if message.RetryMax != 0 { + max = message.RetryMax - if param, err := message.Get("retry").Bool(); err == nil { - retry = param - } else if param, err := message.Get("retry").Int(); err == nil { - max = param - retry = true } - - count, _ := message.Get("retry_count").Int() - return retry && count < max } func incrementRetry(message *Msg) (retryCount int) { - retryCount = 0 - - if count, err := message.Get("retry_count").Int(); err != nil { - message.Set("failed_at", time.Now().UTC().Format(LAYOUT)) + t := time.Now().UTC().Format(LAYOUT) + if message.retryCount == -1 { + message.failedAt = t } else { - message.Set("retried_at", time.Now().UTC().Format(LAYOUT)) - retryCount = count + 1 + message.retriedAt = t } + message.retryCount++ - message.Set("retry_count", retryCount) - - return + return message.retryCount } func secondsToDelay(count int) int { diff --git a/middleware_retry_test.go b/middleware_retry_test.go index b131c77..0cc7246 100644 --- a/middleware_retry_test.go +++ b/middleware_retry_test.go @@ -93,19 +93,12 @@ func MiddlewareRetrySpec(c gospec.Context) { retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1)) messages, _ = NewMsgs([]string{retries[0]}) - queue, _ := messages[0].Get("queue").String() - error_message, _ := messages[0].Get("error_message").String() - error_class, _ := messages[0].Get("error_class").String() - retry_count, _ := messages[0].Get("retry_count").Int() - error_backtrace, _ := messages[0].Get("error_backtrace").String() - failed_at, _ := messages[0].Get("failed_at").String() - - c.Expect(queue, Equals, "myqueue") - c.Expect(error_message, Equals, "AHHHH") - c.Expect(error_class, Equals, "") - c.Expect(retry_count, Equals, 0) - c.Expect(error_backtrace, Equals, "") - c.Expect(failed_at, Equals, time.Now().UTC().Format(layout)) + c.Expect(messages[0].queue, Equals, "myqueue") + c.Expect(messages[0].error, Equals, "AHHHH") + //c.Expect(messages[0].errorClass, Equals, "") + c.Expect(messages[0].retryCount, Equals, 0) + //c.Expect(messages[0].errorBacktrace, Equals, "") + c.Expect(messages[0].failedAt, Equals, time.Now().UTC().Format(layout)) }) c.Specify("handles recurring failed message", func() { @@ -121,17 +114,11 @@ func MiddlewareRetrySpec(c gospec.Context) { retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1)) messages, _ = NewMsgs(retries) - queue, _ := messages[0].Get("queue").String() - error_message, _ := messages[0].Get("error_message").String() - retry_count, _ := messages[0].Get("retry_count").Int() - failed_at, _ := messages[0].Get("failed_at").String() - retried_at, _ := messages[0].Get("retried_at").String() - - c.Expect(queue, Equals, "myqueue") - c.Expect(error_message, Equals, "AHHHH") - c.Expect(retry_count, Equals, 11) - c.Expect(failed_at, Equals, "2013-07-20 14:03:42 UTC") - c.Expect(retried_at, Equals, time.Now().UTC().Format(layout)) + c.Expect(messages[0].queue, Equals, "myqueue") + c.Expect(messages[0].error, Equals, "AHHHH") + c.Expect(messages[0].retryCount, Equals, 11) + c.Expect(messages[0].failedAt, Equals, "2013-07-20 14:03:42 UTC") + c.Expect(messages[0].retriedAt, Equals, time.Now().UTC().Format(layout)) }) c.Specify("handles recurring failed message with customized max", func() { @@ -147,17 +134,11 @@ func MiddlewareRetrySpec(c gospec.Context) { retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1)) messages, _ = NewMsgs(retries) - queue, _ := messages[0].Get("queue").String() - error_message, _ := messages[0].Get("error_message").String() - retry_count, _ := messages[0].Get("retry_count").Int() - failed_at, _ := messages[0].Get("failed_at").String() - retried_at, _ := messages[0].Get("retried_at").String() - - c.Expect(queue, Equals, "myqueue") - c.Expect(error_message, Equals, "AHHHH") - c.Expect(retry_count, Equals, 9) - c.Expect(failed_at, Equals, "2013-07-20 14:03:42 UTC") - c.Expect(retried_at, Equals, time.Now().UTC().Format(layout)) + c.Expect(messages[0].queue, Equals, "myqueue") + c.Expect(messages[0].error, Equals, "AHHHH") + c.Expect(messages[0].retryCount, Equals, 9) + c.Expect(messages[0].failedAt, Equals, "2013-07-20 14:03:42 UTC") + c.Expect(messages[0].retriedAt, Equals, time.Now().UTC().Format(layout)) }) c.Specify("doesn't retry after default number of retries", func() { diff --git a/msg.go b/msg.go index 727fc2b..07a4b8a 100644 --- a/msg.go +++ b/msg.go @@ -1,10 +1,12 @@ package workers import ( + "encoding/json" "fmt" "reflect" + "strconv" - "github.com/bitly/go-simplejson" + simplejson "github.com/bitly/go-simplejson" ) type Msgs []*Msg @@ -13,12 +15,10 @@ func NewMsgs(messages []string) ([]*Msg, error) { msgs := make(Msgs, 0, len(messages)) for _, m := range messages { - msg, err := NewMsg(m) - - if err == nil { - msgs = append(msgs, msg) + if m, err := NewMsgFromString(m); err != nil { + return nil, err } else { - return nil, fmt.Errorf("Couldn't create message from %v: %v", m, err) + msgs = append(msgs, m) } } @@ -27,11 +27,42 @@ func NewMsgs(messages []string) ([]*Msg, error) { type data struct { *simplejson.Json + data []byte +} + +func (d data) ToJson() string { + return string(d.data) +} + +// For gospec only. +func (d *data) Equals(other interface{}) bool { + otherJson := reflect.ValueOf(other).MethodByName("ToJson").Call([]reflect.Value{}) + return d.ToJson() == otherJson[0].String() } type Msg struct { - *data + jid string + Retry bool + RetryMax int + queue string + enqueuedAt float64 + error string + retryCount int + failedAt string + retriedAt string + + // The original json if the message was created with NewMsgFromString original string + + // The job arguments. + args *data +} + +var defaultArgs = []byte("[]") +var defaultArgsJson *simplejson.Json + +func init() { + defaultArgsJson, _ = simplejson.NewJson(defaultArgs) } type Args struct { @@ -39,49 +70,90 @@ type Args struct { } func (m *Msg) Jid() string { - return m.Get("jid").MustString() + return m.jid } func (m *Msg) Args() *Args { - if args, ok := m.CheckGet("args"); ok { - return &Args{&data{args}} - } else { - d, _ := newData("[]") - return &Args{d} + if m.args == nil { + m.args = &data{defaultArgsJson, defaultArgs} } + return &Args{m.args} } func (m *Msg) OriginalJson() string { return m.original } -func (d *data) ToJson() string { - json, err := d.Encode() - - if err != nil { - Logger.Println("ERR: Couldn't generate json from", d, ":", err) +func (m *Msg) ToJson() string { + args := m.Args() + d := map[string]interface{}{ + "jid": m.jid, + "queue": m.queue, + "retry_count": m.retryCount, + "enqueued_at": m.enqueuedAt, + "error_message": m.error, + "failed_at": m.failedAt, + "retried_at": m.retriedAt, + "args": args.Json, } + if m.RetryMax > 0 { + d["retry"] = json.Number(strconv.Itoa(m.RetryMax)) + } else { + d["retry"] = m.Retry + } + json, _ := json.Marshal(d) return string(json) } -func (d *data) Equals(other interface{}) bool { - otherJson := reflect.ValueOf(other).MethodByName("ToJson").Call([]reflect.Value{}) - return d.ToJson() == otherJson[0].String() -} +// NsgMsgFromString creates a new message from the redis retry queue. +func NewMsgFromString(str string) (*Msg, error) { + json, err := simplejson.NewJson([]byte(str)) + if err != nil { + return nil, fmt.Errorf("Couldn't create message from %v: %v", str, err) + } + + m := &Msg{ + jid: json.Get("jid").MustString(), + queue: json.Get("queue").MustString(), + enqueuedAt: json.Get("enqueued_at").MustFloat64(), + error: json.Get("error_message").MustString(), + failedAt: json.Get("failed_at").MustString(), + retriedAt: json.Get("retried_at").MustString(), + original: str, + } + + if args := json.Get("args"); args.Interface() != nil { + b, _ := args.MarshalJSON() + m.args = &data{args, b} + } -func NewMsg(content string) (*Msg, error) { - if d, err := newData(content); err != nil { - return nil, err + if param, err := json.Get("retry_count").Int(); err == nil { + m.retryCount = param } else { - return &Msg{d, content}, nil + m.retryCount = -1 + } + + // Retry is either a bool or a number for hysterical reasons. + if param, err := json.Get("retry").Bool(); err == nil { + m.Retry = param + } else if param, err := json.Get("retry").Int(); err == nil { + m.RetryMax = param + m.Retry = true } + + return m, nil } -func newData(content string) (*data, error) { - if json, err := simplejson.NewJson([]byte(content)); err != nil { - return nil, err - } else { - return &data{json}, nil +// NewMsg creates a new virgin message. +func NewMsg(jid string, args *simplejson.Json, b []byte) *Msg { + m := &Msg{ + jid: jid, + Retry: true, + retryCount: -1, + } + if args != nil { + m.args = &data{args, b} } + return m } diff --git a/msg_test.go b/msg_test.go index 456371d..c1df746 100644 --- a/msg_test.go +++ b/msg_test.go @@ -6,32 +6,20 @@ import ( ) func MsgSpec(c gospec.Context) { - c.Specify("NewMsg", func() { - c.Specify("unmarshals json", func() { - msg, _ := NewMsg("{\"hello\":\"world\",\"foo\":3}") - hello, _ := msg.Get("hello").String() - foo, _ := msg.Get("foo").Int() - - c.Expect(hello, Equals, "world") - c.Expect(foo, Equals, 3) - }) - - c.Specify("returns an error if invalid json", func() { - msg, err := NewMsg("{\"hello:\"world\",\"foo\":3}") - - c.Expect(msg, IsNil) - c.Expect(err, Not(IsNil)) - }) - }) - c.Specify("Args", func() { c.Specify("returns args key", func() { - msg, _ := NewMsg("{\"hello\":\"world\",\"args\":[\"foo\",\"bar\"]}") + msg := buildVirginMessages("[\"foo\",\"bar\"]")[0] c.Expect(msg.Args().ToJson(), Equals, "[\"foo\",\"bar\"]") }) c.Specify("returns empty array if args key doesn't exist", func() { - msg, _ := NewMsg("{\"hello\":\"world\"}") + msg, _ := NewMsgFromString("{}}") + //"", nil, nil) + c.Expect(msg.Args().ToJson(), Equals, "[]") + }) + + c.Specify("returns empty array if args key doesn't exist", func() { + msg := NewMsg("", nil, nil) c.Expect(msg.Args().ToJson(), Equals, "[]") }) }) diff --git a/scheduled.go b/scheduled.go index a7b08fa..f771b34 100644 --- a/scheduled.go +++ b/scheduled.go @@ -16,15 +16,14 @@ type scheduled struct { func (s *scheduled) start() { go (func() { for { + s.poll() + select { case <-s.closed: return - default: + case <-time.After(time.Duration(Config.PollInterval) * time.Second): } - s.poll() - - time.Sleep(time.Duration(Config.PollInterval) * time.Second) } })() } @@ -47,12 +46,11 @@ func (s *scheduled) poll() { break } - message, _ := NewMsg(messages[0]) + message, _ := NewMsgFromString(messages[0]) if removed, _ := redis.Bool(conn.Do("zrem", key, messages[0])); removed { - queue, _ := message.Get("queue").String() - queue = strings.TrimPrefix(queue, Config.Namespace) - message.Set("enqueued_at", nowToSecondsWithNanoPrecision()) + queue := strings.TrimPrefix(message.queue, Config.Namespace) + message.enqueuedAt = nowToSecondsWithNanoPrecision() conn.Do("lpush", Config.Namespace+"queue:"+queue, message.ToJson()) } } diff --git a/scheduled_test.go b/scheduled_test.go index d292a77..3e4ee73 100644 --- a/scheduled_test.go +++ b/scheduled_test.go @@ -18,9 +18,9 @@ func ScheduledSpec(c gospec.Context) { now := nowToSecondsWithNanoPrecision() - message1, _ := NewMsg("{\"queue\":\"default\",\"foo\":\"bar1\"}") - message2, _ := NewMsg("{\"queue\":\"myqueue\",\"foo\":\"bar2\"}") - message3, _ := NewMsg("{\"queue\":\"default\",\"foo\":\"bar3\"}") + message1, _ := NewMsgFromString("{\"queue\":\"default\",\"args\":{\"foo\":\"bar1\"}}") + message2, _ := NewMsgFromString("{\"queue\":\"myqueue\",\"args\":{\"foo\":\"bar2\"}}") + message3, _ := NewMsgFromString("{\"queue\":\"default\",\"args\":{\"foo\":\"bar3\"}}") conn.Do("zadd", "prod:"+RETRY_KEY, now-60.0, message1.ToJson()) conn.Do("zadd", "prod:"+RETRY_KEY, now-10.0, message2.ToJson()) diff --git a/worker_test.go b/worker_test.go index c87e348..4b7d084 100644 --- a/worker_test.go +++ b/worker_test.go @@ -57,7 +57,9 @@ func WorkerSpec(c gospec.Context) { c.Specify("work", func() { worker := newWorker(manager) msgs := make(chan Msgs) - messages, _ := NewMsgs([]string{"{\"jid\":\"2309823\",\"args\":[\"foo\",\"bar\"]}"}) + + messages := buildVirginMessages("[\"foo\",\"bar\"]") + messages[0].jid = "2309823" c.Specify("calls job with message args", func() { go worker.work(msgs) diff --git a/workers.go b/workers.go index fc39be6..e179575 100644 --- a/workers.go +++ b/workers.go @@ -14,7 +14,7 @@ const ( SCHEDULED_JOBS_KEY = "schedule" ) -var Logger WorkersLogger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds) +var Logger WorkersLogger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds|log.Lshortfile) var managers = make(map[string]*manager) var schedule *scheduled