Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/customerio/gospec"
. "github.com/customerio/gospec"
"github.com/garyburd/redigo/redis"

simplejson "github.com/bitly/go-simplejson"
)

func buildFetch(queue string) Fetcher {
Expand All @@ -13,6 +15,29 @@ func buildFetch(queue string) Fetcher {
return fetch
}

func buildVirginMessages(args ...string) Msgs {
var msgs []*Msg
for _, a := range args {
b := []byte(a)
j, _ := simplejson.NewJson(b)
msgs = append(msgs, NewMsg("", j, b))
}
return msgs
}

func buildMessages(args ...string) Msgs {
var s []string
for _, a := range args {
b := []byte(a)
j, _ := simplejson.NewJson(b)
m := NewMsg("", j, b)
s = append(s, m.ToJson())
}

messages, _ := NewMsgs(s)
return messages
}

func FetchSpec(c gospec.Context) {
c.Specify("Config.Fetch", func() {
c.Specify("it returns an instance of fetch with queue", func() {
Expand All @@ -23,7 +48,7 @@ func FetchSpec(c gospec.Context) {
})

c.Specify("Fetch", func() {
messages, _ := NewMsgs([]string{"{\"foo\":\"bar\"}"})
messages := buildMessages("{\"foo\":\"bar\"}")

c.Specify("it puts messages from the queues on the messages channel", func() {
fetch := buildFetch("fetchQueue2")
Expand Down Expand Up @@ -116,7 +141,7 @@ func FetchSpec(c gospec.Context) {
})

c.Specify("refires any messages left in progress from prior instance", func() {
msgs, _ := NewMsgs([]string{"{\"foo\":\"bar2\"}", "{\"foo\":\"bar3\"}"})
msgs := buildMessages("{\"foo\":\"bar2\"}", "{\"foo\":\"bar3\"}")

conn := Config.Pool.Get()
defer conn.Close()
Expand Down
2 changes: 1 addition & 1 deletion fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (f *fetch) Acknowledge(messages Msgs) {

// TODO optimize with a redis lua script
for _, m := range messages {
conn.Do("lrem", f.inprogressQueue(), -1, m.OriginalJson())
conn.Do("lrem", f.inprogressQueue(), -1, string(m.OriginalJson()))
}
}

Expand Down
3 changes: 2 additions & 1 deletion manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ func (m *manager) manage() {

go m.fetch.Fetch()

loop:
for {
select {
case message := <-m.confirm:
m.fetch.Acknowledge(message)
case <-m.stop:
m.exit <- true
break
break loop
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func ManagerSpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

message, _ := NewMsg("{\"foo\":\"bar\",\"args\":[\"foo\",\"bar\"]}")
message2, _ := NewMsg("{\"foo\":\"bar2\",\"args\":[\"foo\",\"bar2\"]}")
msgs := buildVirginMessages("[\"foo\",\"bar\"]", "[\"foo\",\"bar2\"]")
message, message2 := msgs[0], msgs[1]

c.Specify("coordinates processing of queue messages", func() {
manager := newManager("manager1", testJob, 10)
Expand Down
32 changes: 12 additions & 20 deletions middleware_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func (r *MiddlewareRetry) Call(queue string, messages Msgs, next func() bool) (a

for _, m := range messages {
if retry(m) {
m.Set("queue", queue)
m.Set("error_message", fmt.Sprintf("%v", e))
m.queue = queue
m.error = fmt.Sprintf("%v", e)
retryCount := incrementRetry(m)

waitDuration := durationToSecondsWithNanoPrecision(
Expand Down Expand Up @@ -58,34 +58,26 @@ func (r *MiddlewareRetry) Call(queue string, messages Msgs, next func() bool) (a
}

func retry(message *Msg) bool {
retry := false
max := DEFAULT_MAX_RETRY
retry := message.Retry
count := message.retryCount
if message.RetryMax != 0 {
max = message.RetryMax

if param, err := message.Get("retry").Bool(); err == nil {
retry = param
} else if param, err := message.Get("retry").Int(); err == nil {
max = param
retry = true
}

count, _ := message.Get("retry_count").Int()

return retry && count < max
}

func incrementRetry(message *Msg) (retryCount int) {
retryCount = 0

if count, err := message.Get("retry_count").Int(); err != nil {
message.Set("failed_at", time.Now().UTC().Format(LAYOUT))
t := time.Now().UTC().Format(LAYOUT)
if message.retryCount == -1 {
message.failedAt = t
} else {
message.Set("retried_at", time.Now().UTC().Format(LAYOUT))
retryCount = count + 1
message.retriedAt = t
}
message.retryCount++

message.Set("retry_count", retryCount)

return
return message.retryCount
}

func secondsToDelay(count int) int {
Expand Down
51 changes: 16 additions & 35 deletions middleware_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,12 @@ func MiddlewareRetrySpec(c gospec.Context) {
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
messages, _ = NewMsgs([]string{retries[0]})

queue, _ := messages[0].Get("queue").String()
error_message, _ := messages[0].Get("error_message").String()
error_class, _ := messages[0].Get("error_class").String()
retry_count, _ := messages[0].Get("retry_count").Int()
error_backtrace, _ := messages[0].Get("error_backtrace").String()
failed_at, _ := messages[0].Get("failed_at").String()

c.Expect(queue, Equals, "myqueue")
c.Expect(error_message, Equals, "AHHHH")
c.Expect(error_class, Equals, "")
c.Expect(retry_count, Equals, 0)
c.Expect(error_backtrace, Equals, "")
c.Expect(failed_at, Equals, time.Now().UTC().Format(layout))
c.Expect(messages[0].queue, Equals, "myqueue")
c.Expect(messages[0].error, Equals, "AHHHH")
//c.Expect(messages[0].errorClass, Equals, "")
c.Expect(messages[0].retryCount, Equals, 0)
//c.Expect(messages[0].errorBacktrace, Equals, "")
c.Expect(messages[0].failedAt, Equals, time.Now().UTC().Format(layout))
})

c.Specify("handles recurring failed message", func() {
Expand All @@ -121,17 +114,11 @@ func MiddlewareRetrySpec(c gospec.Context) {
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
messages, _ = NewMsgs(retries)

queue, _ := messages[0].Get("queue").String()
error_message, _ := messages[0].Get("error_message").String()
retry_count, _ := messages[0].Get("retry_count").Int()
failed_at, _ := messages[0].Get("failed_at").String()
retried_at, _ := messages[0].Get("retried_at").String()

c.Expect(queue, Equals, "myqueue")
c.Expect(error_message, Equals, "AHHHH")
c.Expect(retry_count, Equals, 11)
c.Expect(failed_at, Equals, "2013-07-20 14:03:42 UTC")
c.Expect(retried_at, Equals, time.Now().UTC().Format(layout))
c.Expect(messages[0].queue, Equals, "myqueue")
c.Expect(messages[0].error, Equals, "AHHHH")
c.Expect(messages[0].retryCount, Equals, 11)
c.Expect(messages[0].failedAt, Equals, "2013-07-20 14:03:42 UTC")
c.Expect(messages[0].retriedAt, Equals, time.Now().UTC().Format(layout))
})

c.Specify("handles recurring failed message with customized max", func() {
Expand All @@ -147,17 +134,11 @@ func MiddlewareRetrySpec(c gospec.Context) {
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
messages, _ = NewMsgs(retries)

queue, _ := messages[0].Get("queue").String()
error_message, _ := messages[0].Get("error_message").String()
retry_count, _ := messages[0].Get("retry_count").Int()
failed_at, _ := messages[0].Get("failed_at").String()
retried_at, _ := messages[0].Get("retried_at").String()

c.Expect(queue, Equals, "myqueue")
c.Expect(error_message, Equals, "AHHHH")
c.Expect(retry_count, Equals, 9)
c.Expect(failed_at, Equals, "2013-07-20 14:03:42 UTC")
c.Expect(retried_at, Equals, time.Now().UTC().Format(layout))
c.Expect(messages[0].queue, Equals, "myqueue")
c.Expect(messages[0].error, Equals, "AHHHH")
c.Expect(messages[0].retryCount, Equals, 9)
c.Expect(messages[0].failedAt, Equals, "2013-07-20 14:03:42 UTC")
c.Expect(messages[0].retriedAt, Equals, time.Now().UTC().Format(layout))
})

c.Specify("doesn't retry after default number of retries", func() {
Expand Down
Loading