Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
4 changes: 3 additions & 1 deletion cmd/cachewd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ func main() {
return tokenManagerProvider()
})

scheduler := jobscheduler.New(ctx, globalConfig.SchedulerConfig)
scheduler, err := jobscheduler.New(ctx, globalConfig.SchedulerConfig)
kctx.FatalIfErrorf(err, "failed to create scheduler")
defer scheduler.Close()

cr, sr := newRegistries(scheduler, managerProvider, tokenManagerProvider)

Expand Down
78 changes: 64 additions & 14 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package jobscheduler

import (
"context"
"fmt"
"runtime"
"sync"
"time"
Expand All @@ -14,7 +13,8 @@ import (
)

type Config struct {
Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"0"`
Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"0"`
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
}

type queueJob struct {
Expand All @@ -23,7 +23,9 @@ type queueJob struct {
run func(ctx context.Context) error
}

func (j *queueJob) String() string { return fmt.Sprintf("job-%s-%s", j.id, j.queue) }
func jobKey(queue, id string) string { return queue + ":" + id }

func (j *queueJob) String() string { return jobKey(j.queue, j.id) }
func (j *queueJob) Run(ctx context.Context) error { return errors.WithStack(j.run(ctx)) }

// Scheduler runs background jobs concurrently across multiple serialised queues.
Expand Down Expand Up @@ -73,25 +75,42 @@ type RootScheduler struct {
queue []queueJob
active map[string]bool
cancel context.CancelFunc
store ScheduleStore
}

var _ Scheduler = &RootScheduler{}

// New creates a new JobScheduler.
func New(ctx context.Context, config Config) Scheduler {
func New(ctx context.Context, config Config) (*RootScheduler, error) {
if config.Concurrency == 0 {
config.Concurrency = runtime.NumCPU()
}
var store ScheduleStore
if config.SchedulerDB != "" {
var err error
store, err = NewScheduleStore(config.SchedulerDB)
if err != nil {
return nil, errors.Wrap(err, "create schedule store")
}
}
q := &RootScheduler{
workAvailable: make(chan bool, 1024),
active: make(map[string]bool),
store: store,
}
ctx, cancel := context.WithCancel(ctx)
q.cancel = cancel
for id := range config.Concurrency {
go q.worker(ctx, id)
}
return q
return q, nil
}

func (q *RootScheduler) Close() error {
if q.store != nil {
return errors.WithStack(q.store.Close())
}
return nil
}

func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler {
Expand All @@ -108,15 +127,46 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e
q.workAvailable <- true
}

func (q *RootScheduler) SubmitPeriodicJob(queue, description string, interval time.Duration, run func(ctx context.Context) error) {
q.Submit(queue, description, func(ctx context.Context) error {
err := run(ctx)
go func() {
time.Sleep(interval)
q.SubmitPeriodicJob(queue, description, interval, run)
}()
return errors.WithStack(err)
})
func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
key := jobKey(queue, id)
delay := q.periodicDelay(key, interval)
submit := func() {
q.Submit(queue, id, func(ctx context.Context) error {
err := run(ctx)
if q.store != nil {
if storeErr := q.store.SetLastRun(key, time.Now()); storeErr != nil {
logging.FromContext(ctx).WarnContext(ctx, "Failed to record job last run", "key", key, "error", storeErr)
}
}
go func() {
time.Sleep(interval)
q.SubmitPeriodicJob(queue, id, interval, run)
}()
return errors.WithStack(err)
})
}
if delay <= 0 {
submit()
return
}
go func() {
time.Sleep(delay)
submit()
}()
}

func (q *RootScheduler) periodicDelay(key string, interval time.Duration) time.Duration {
if q.store == nil {
return 0
}
lastRun, ok, err := q.store.GetLastRun(key)
if err != nil || !ok {
return 0
}
if remaining := time.Until(lastRun.Add(interval)); remaining > 0 {
return remaining
}
return 0
}

func (q *RootScheduler) worker(ctx context.Context, id int) {
Expand Down
30 changes: 19 additions & 11 deletions internal/jobscheduler/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ import (
"github.com/block/cachew/internal/logging"
)

func newTestScheduler(ctx context.Context, t *testing.T, config jobscheduler.Config) jobscheduler.Scheduler {
t.Helper()
s, err := jobscheduler.New(ctx, config)
assert.NoError(t, err)
t.Cleanup(func() { s.Close() })
return s
}

func eventually(t *testing.T, timeout time.Duration, condition func() bool, msgAndArgs ...any) {
t.Helper()
deadline := time.Now().Add(timeout)
Expand All @@ -40,7 +48,7 @@ func TestJobSchedulerBasic(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

var executed atomic.Bool
scheduler.Submit("queue1", "job1", func(_ context.Context) error {
Expand All @@ -57,7 +65,7 @@ func TestJobSchedulerConcurrency(t *testing.T) {
defer cancel()

concurrency := 4
scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: concurrency})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: concurrency})

var (
running atomic.Int32
Expand Down Expand Up @@ -103,7 +111,7 @@ func TestJobSchedulerQueueIsolation(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 4})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 4})

var (
queue1Running atomic.Int32
Expand Down Expand Up @@ -155,7 +163,7 @@ func TestJobSchedulerJobOrdering(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 4})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 4})

var (
mu sync.Mutex
Expand Down Expand Up @@ -193,7 +201,7 @@ func TestJobSchedulerErrorHandling(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

var (
failingJobExecuted atomic.Bool
Expand All @@ -219,7 +227,7 @@ func TestJobSchedulerContextCancellation(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

var jobStarted atomic.Bool

Expand All @@ -241,7 +249,7 @@ func TestJobSchedulerPeriodicJob(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

var executionCount atomic.Int32

Expand All @@ -260,7 +268,7 @@ func TestJobSchedulerPeriodicJobWithError(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

var executionCount atomic.Int32

Expand All @@ -279,7 +287,7 @@ func TestJobSchedulerMultipleQueues(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 3})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 3})

queues := []string{"queue1", "queue2", "queue3", "queue4", "queue5"}
totalJobs := len(queues)
Expand Down Expand Up @@ -319,7 +327,7 @@ func TestJobSchedulerHighConcurrency(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 50})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 50})

jobCount := 100
var completed atomic.Int32
Expand Down Expand Up @@ -371,7 +379,7 @@ func FuzzJobScheduler(f *testing.F) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

scheduler := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: int(concurrency)})
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: int(concurrency)})

var (
completed atomic.Int32
Expand Down
68 changes: 68 additions & 0 deletions internal/jobscheduler/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package jobscheduler

import (
"time"

"github.com/alecthomas/errors"
"go.etcd.io/bbolt"
)

//nolint:gochecknoglobals
var scheduleBucketName = []byte("schedule")

// ScheduleStore persists the last execution time of periodic jobs.
type ScheduleStore interface {
GetLastRun(key string) (time.Time, bool, error)
SetLastRun(key string, t time.Time) error
Close() error
}

type boltScheduleStore struct {
db *bbolt.DB
}

// NewScheduleStore creates a bbolt-backed schedule store at the given database path.
func NewScheduleStore(dbPath string) (ScheduleStore, error) {
db, err := bbolt.Open(dbPath, 0600, &bbolt.Options{
Timeout: 5 * time.Second,
})
if err != nil {
return nil, errors.Wrap(err, "open scheduler database")
}
if err := db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(scheduleBucketName)
return errors.WithStack(err)
}); err != nil {
return nil, errors.Join(errors.Wrap(err, "create schedule bucket"), db.Close())
}
return &boltScheduleStore{db: db}, nil
}

func (s *boltScheduleStore) GetLastRun(key string) (time.Time, bool, error) {
var t time.Time
var found bool
err := s.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(scheduleBucketName)
data := bucket.Get([]byte(key))
if data == nil {
return nil
}
found = true
return errors.WithStack(t.UnmarshalBinary(data))
})
return t, found, errors.WithStack(err)
}

func (s *boltScheduleStore) SetLastRun(key string, t time.Time) error {
data, err := t.MarshalBinary()
if err != nil {
return errors.Wrap(err, "marshal time")
}
return errors.WithStack(s.db.Update(func(tx *bbolt.Tx) error {
return errors.WithStack(tx.Bucket(scheduleBucketName).Put([]byte(key), data))
}))
}

func (s *boltScheduleStore) Close() error {
return errors.WithStack(s.db.Close())
}
Loading