Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
vendor
.idea
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go

go:
- 1.7
- "1.19"

script:
- go get github.com/customerio/gospec
Expand Down
18 changes: 18 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@


export COMPOSE_PROJECT_NAME := go-workers

.PHONY: deps/start
deps/start:
echo "${COMPOSE_PROJECT_NAME}"
docker-compose -p '${COMPOSE_PROJECT_NAME}' up -d

.PHONY: deps/stop
deps/stop:
docker-compose -p '${COMPOSE_PROJECT_NAME}' down

test:
@make deps/start
go get github.com/customerio/gospec
go test -v
@make deps/stop
92 changes: 53 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,69 @@ Example usage:
package main

import (
"github.com/jrallison/go-workers"
"github.com/topfreegames/go-workers"
workers "go-workers"
)

func myJob(message *workers.Msg) {
// do something with your message
// message.Jid()
// message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
// do something with your message
// message.Jid()
// message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
}

type myMiddleware struct{}

func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
// do something before each message is processed
acknowledge = next()
// do something after each message is processed
return
}
// do something before each message is processed
acknowledge = next()
// do something after each message is processed
return
}

func main() {
workers.Configure(map[string]string{
// location of redis instance
"server": "localhost:6379",
// instance of the database
"database": "0",
// number of connections to keep open with redis
"pool": "30",
// unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
"process": "1",
})

workers.Middleware.Append(&myMiddleware{})

// pull messages from "myqueue" with concurrency of 10
workers.Process("myqueue", myJob, 10)

// pull messages from "myqueue2" with concurrency of 20
workers.Process("myqueue2", myJob, 20)

// Add a job to a queue
workers.Enqueue("myqueue3", "Add", []int{1, 2})

// Add a job to a queue with retry
workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

// stats will be available at http://localhost:8080/stats
go workers.StatsServer(8080)

// Blocks until process is told to exit via unix signal
workers.Run()
workers.Configure(workers.Options{
// location of redis instance
Address: "localhost:6379",
// instance of the database
Database: "0",
// number of connections to keep open with redis
PoolSize: "30",
// unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
ProcessID: "1",
})

workers.Middleware.Append(&myMiddleware{})

// pull messages from "myqueue" with concurrency of 10
workers.Process("myqueue", myJob, 10)

// pull messages from "myqueue2" with concurrency of 20
workers.Process("myqueue2", myJob, 20)

// Add a job to a queue
workers.Enqueue("myqueue3", "Add", []int{1, 2})

// Add a job to a queue with retry
workers.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

// Add a job to a queue in a different redis instance
workers.EnqueueWithOptions("myqueue4", "Add", []int{1, 2},
workers.EnqueueOptions{
Retry: true,
ConnectionOptions: workers.Options{
Address: "localhost:6378",
Database: "my-database",
PoolSize: 10,
Password: "pass",
},
},
)

// stats will be available at http://localhost:8080/stats
go workers.StatsServer(8080)

// Blocks until process is told to exit via unix signal
workers.Run()
}
```

Expand Down
20 changes: 13 additions & 7 deletions all_specs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@ func TestAllSpecs(t *testing.T) {
r.Parallel = false

r.BeforeEach = func() {
Configure(map[string]string{
"server": "localhost:6379",
"process": "1",
"database": "15",
"pool": "1",
Configure(Options{
Address: "localhost:6379",
ProcessID: "1",
Database: "15",
PoolSize: 1,
})

conn := Config.Pool.Get()
conn.Do("flushdb")
conn.Close()
_, err := conn.Do("flushdb")
if err != nil {
panic("failed to flush db: " + err.Error())
}
err = conn.Close()
if err != nil {
panic("failed close connection: " + err.Error())
}
}

// List all specs here
Expand Down
110 changes: 61 additions & 49 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,80 +1,92 @@
package workers

import (
"strconv"
"fmt"
"time"

"github.com/garyburd/redigo/redis"
"github.com/gomodule/redigo/redis"
)

type config struct {
type Options struct {
Address string
Password string
Database string
ProcessID string
Namespace string
PoolSize int
PoolInterval int
DialOptions []redis.DialOption
}

type WorkerConfig struct {
processId string
Namespace string
PollInterval int
PoolInterval int
Pool *redis.Pool
Fetch func(queue string) Fetcher
}

var Config *config
var Config *WorkerConfig

func Configure(options map[string]string) {
var poolSize int
func Configure(options Options) {
var namespace string
var pollInterval int

if options["server"] == "" {
panic("Configure requires a 'server' option, which identifies a Redis instance")
if options.Address == "" {
panic("Configure requires a 'Address' option, which identifies a Redis instance")
}
if options["process"] == "" {
panic("Configure requires a 'process' option, which uniquely identifies this instance")
if options.ProcessID == "" {
panic("Configure requires a 'ProcessID' option, which uniquely identifies this instance")
}
if options["pool"] == "" {
options["pool"] = "1"
if options.PoolSize <= 0 {
options.PoolSize = 1
}
if options["namespace"] != "" {
namespace = options["namespace"] + ":"
if options.Namespace != "" {
namespace = options.Namespace + ":"
}
if seconds, err := strconv.Atoi(options["poll_interval"]); err == nil {
pollInterval = seconds
} else {
pollInterval = 15
if options.PoolInterval <= 0 {
options.PoolInterval = 15
}

poolSize, _ = strconv.Atoi(options["pool"])

Config = &config{
options["process"],
Config = &WorkerConfig{
options.ProcessID,
namespace,
pollInterval,
&redis.Pool{
MaxIdle: poolSize,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", options["server"])
if err != nil {
return nil, err
}
if options["password"] != "" {
if _, err := c.Do("AUTH", options["password"]); err != nil {
c.Close()
return nil, err
options.PoolInterval,
GetConnectionPool(options),
func(queue string) Fetcher {
return NewFetch(queue, make(chan *Msg), make(chan bool))
},
}
}

func GetConnectionPool(options Options) *redis.Pool {
return &redis.Pool{
MaxIdle: options.PoolSize,
IdleTimeout: 240 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", options.Address, options.DialOptions...)
if err != nil {
return nil, err
}
if options.Password != "" {
if _, err := c.Do("AUTH", options.Password); err != nil {
if errClose := c.Close(); errClose != nil {
return nil, fmt.Errorf("%w. failed to close connection: %s", err, errClose.Error())
}
return nil, err
}
if options["database"] != "" {
if _, err := c.Do("SELECT", options["database"]); err != nil {
c.Close()
return nil, err
}
if options.Database != "" {
if _, err := c.Do("SELECT", options.Database); err != nil {
if errClose := c.Close(); errClose != nil {
return nil, fmt.Errorf("%w. failed to close connection: %s", err, errClose.Error())
}
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
return c, err
},
func(queue string) Fetcher {
return NewFetch(queue, make(chan *Msg), make(chan bool))
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
},
}
}
Loading